From fc970ef01c0f5e0cd1724c852ed9bdb69e487ae0 Mon Sep 17 00:00:00 2001 From: Michael Busch Date: Wed, 21 Jul 2010 10:27:20 +0000 Subject: [PATCH] LUCENE-2324: Committing second version of the patch to the real-time branch. It's not done yet, but easier to track progress using the branch. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@966168 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/lucene/index/DocumentsWriter.java | 2156 +++++------------ .../org/apache/lucene/index/IndexWriter.java | 711 +----- .../lucene/index/TermsHashPerThread.java | 114 - 3 files changed, 742 insertions(+), 2239 deletions(-) delete mode 100644 lucene/src/java/org/apache/lucene/index/TermsHashPerThread.java diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java index 1a61cfc0f74..3aa00d9d5a0 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -1,15 +1,42 @@ package org.apache.lucene.index; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DocumentsWriterThreadPool.ThreadState; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; + /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,662 +44,312 @@ package org.apache.lucene.index; * limitations under the License. */ -import java.io.IOException; -import java.io.PrintStream; -import java.text.NumberFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.HashSet; -import java.util.List; -import java.util.Map.Entry; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.codecs.Codec; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.Scorer; -import org.apache.lucene.search.Similarity; -import org.apache.lucene.search.Weight; -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.RAMFile; -import org.apache.lucene.util.ArrayUtil; -import org.apache.lucene.util.Constants; -import org.apache.lucene.util.ThreadInterruptedException; -import org.apache.lucene.util.RamUsageEstimator; - -/** - * This class accepts multiple added documents and directly - * writes a single segment file. It does this more - * efficiently than creating a single segment per document - * (with DocumentWriter) and doing standard merges on those - * segments. - * - * Each added document is passed to the {@link DocConsumer}, - * which in turn processes the document and interacts with - * other consumers in the indexing chain. Certain - * consumers, like {@link StoredFieldsWriter} and {@link - * TermVectorsTermsWriter}, digest a document and - * immediately write bytes to the "doc store" files (ie, - * they do not consume RAM per document, except while they - * are processing the document). - * - * Other consumers, eg {@link FreqProxTermsWriter} and - * {@link NormsWriter}, buffer bytes in RAM and flush only - * when a new segment is produced. - - * Once we have used our allowed RAM buffer, or the number - * of added docs is large enough (in the case we are - * flushing by doc count instead of RAM usage), we create a - * real segment and flush it to the Directory. - * - * Threads: - * - * Multiple threads are allowed into addDocument at once. - * There is an initial synchronized call to getThreadState - * which allocates a ThreadState for this thread. The same - * thread will get the same ThreadState over time (thread - * affinity) so that if there are consistent patterns (for - * example each thread is indexing a different content - * source) then we make better use of RAM. Then - * processDocument is called on that ThreadState without - * synchronization (most of the "heavy lifting" is in this - * call). Finally the synchronized "finishDocument" is - * called to flush changes to the directory. - * - * When flush is called by IndexWriter we forcefully idle - * all threads and flush only once they are all idle. This - * means you can call flush with a given thread even while - * other threads are actively adding/deleting documents. - * - * - * Exceptions: - * - * Because this class directly updates in-memory posting - * lists, and flushes stored fields and term vectors - * directly to files in the directory, there are certain - * limited times when an exception can corrupt this state. - * For example, a disk full while flushing stored fields - * leaves this file in a corrupt state. Or, an OOM - * exception while appending to the in-memory posting lists - * can corrupt that posting list. We call such exceptions - * "aborting exceptions". In these cases we must call - * abort() to discard all docs added since the last flush. - * - * All other exceptions ("non-aborting exceptions") can - * still partially update the index structures. These - * updates are consistent, but, they represent only a part - * of the document seen up until the exception was hit. - * When this happens, we immediately mark the document as - * deleted so that the document is always atomically ("all - * or none") added to the index. - */ - final class DocumentsWriter { + private long sequenceID; + private int numDocumentsWriterPerThreads; - IndexWriter writer; - Directory directory; + private final BufferedDeletesInRAM deletesInRAM = new BufferedDeletesInRAM(); + private final DocumentsWriterThreadPool threadPool; + private final Lock sequenceIDLock = new ReentrantLock(); - String segment; // Current segment we are working on - private String docStoreSegment; // Current doc-store segment we are writing - private int docStoreOffset; // Current starting doc-store offset of current segment + private final Directory directory; + final IndexWriter indexWriter; + final IndexWriterConfig config; - private int nextDocID; // Next docID to be added - private int numDocsInRAM; // # docs buffered in RAM - int numDocsInStore; // # docs written to doc stores - - // Max # ThreadState instances; if there are more threads - // than this they share ThreadStates - private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0]; - private final HashMap threadBindings = new HashMap(); - - private int pauseThreads; // Non-zero when we need all threads to - // pause (eg to flush) - boolean flushPending; // True when a thread has decided to flush - boolean bufferIsFull; // True when it's time to write segment - private boolean aborting; // True if an abort is pending - - private DocFieldProcessor docFieldProcessor; - - PrintStream infoStream; - int maxFieldLength = IndexWriterConfig.UNLIMITED_FIELD_LENGTH; - Similarity similarity; - - // max # simultaneous threads; if there are more than - // this, they wait for others to finish first - private final int maxThreadStates; - - List newFiles; - - static class DocState { - DocumentsWriter docWriter; - Analyzer analyzer; - int maxFieldLength; - PrintStream infoStream; - Similarity similarity; - int docID; - Document doc; - String maxTermPrefix; - - // Only called by asserts - public boolean testPoint(String name) { - return docWriter.writer.testPoint(name); - } - - public void clear() { - // don't hold onto doc nor analyzer, in case it is - // largish: - doc = null; - analyzer = null; - } - } - - /** Consumer returns this on each doc. This holds any - * state that must be flushed synchronized "in docID - * order". We gather these and flush them in order. */ - abstract static class DocWriter { - DocWriter next; - int docID; - abstract void finish() throws IOException; - abstract void abort(); - abstract long sizeInBytes(); - - void setNext(DocWriter next) { - this.next = next; - } - } - - /** - * Create and return a new DocWriterBuffer. - */ - PerDocBuffer newPerDocBuffer() { - return new PerDocBuffer(); - } - - /** - * RAMFile buffer for DocWriters. - */ - class PerDocBuffer extends RAMFile { - - /** - * Allocate bytes used from shared pool. - */ - protected byte[] newBuffer(int size) { - assert size == PER_DOC_BLOCK_SIZE; - return perDocAllocator.getByteBlock(); - } - - /** - * Recycle the bytes used. - */ - synchronized void recycle() { - if (buffers.size() > 0) { - setLength(0); - - // Recycle the blocks - perDocAllocator.recycleByteBlocks(buffers); - buffers.clear(); - sizeInBytes = 0; - - assert numBuffers() == 0; - } - } - } - - /** - * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method - * which returns the DocConsumer that the DocumentsWriter calls to process the - * documents. - */ - abstract static class IndexingChain { - abstract DocConsumer getChain(DocumentsWriter documentsWriter); - } - - static final IndexingChain defaultIndexingChain = new IndexingChain() { - - @Override - DocConsumer getChain(DocumentsWriter documentsWriter) { - /* - This is the current indexing chain: - - DocConsumer / DocConsumerPerThread - --> code: DocFieldProcessor / DocFieldProcessorPerThread - --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField - --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField - --> code: DocInverter / DocInverterPerThread / DocInverterPerField - --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField - --> code: TermsHash / TermsHashPerThread / TermsHashPerField - --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField - --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField - --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField - --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField - --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField - --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField - */ - - // Build up indexing chain: - - final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter); - final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter(); - - final InvertedDocConsumer termsHash = new TermsHash(documentsWriter, true, freqProxWriter, - new TermsHash(documentsWriter, false, termVectorsWriter, null)); - final NormsWriter normsWriter = new NormsWriter(); - final DocInverter docInverter = new DocInverter(termsHash, normsWriter); - return new DocFieldProcessor(documentsWriter, docInverter); - } - }; - - final DocConsumer consumer; - - // Deletes done after the last flush; these are discarded - // on abort - private BufferedDeletes deletesInRAM = new BufferedDeletes(false); - - // Deletes done before the last flush; these are still - // kept on abort - private BufferedDeletes deletesFlushed = new BufferedDeletes(true); - - // The max number of delete terms that can be buffered before - // they must be flushed to disk. - private int maxBufferedDeleteTerms = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS; - - // How much RAM we can use before flushing. This is 0 if - // we are flushing by doc count instead. - private long ramBufferSize = (long) (IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024); - private long waitQueuePauseBytes = (long) (ramBufferSize*0.1); - private long waitQueueResumeBytes = (long) (ramBufferSize*0.05); - - // If we've allocated 5% over our RAM budget, we then - // free down to 95% - private long freeLevel = (long) (IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*0.95); - - // Flush @ this number of docs. If ramBufferSize is - // non-zero we will flush by RAM usage instead. - private int maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS; - - private int flushedDocCount; // How many docs already flushed to index - - synchronized void updateFlushedDocCount(int n) { - flushedDocCount += n; - } - synchronized int getFlushedDocCount() { - return flushedDocCount; - } - synchronized void setFlushedDocCount(int n) { - flushedDocCount = n; - } + private int maxBufferedDocs; + private double maxBufferSizeMB; + private int maxBufferedDeleteTerms; private boolean closed; + private AtomicInteger numDocsInRAM = new AtomicInteger(0); + private AtomicLong ramUsed = new AtomicLong(0); - DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain, int maxThreadStates) throws IOException { + private long flushedSequenceID = -1; + private final PrintStream infoStream; + + private Map minSequenceIDsPerThread = new HashMap(); + + public DocumentsWriter(Directory directory, IndexWriter indexWriter, IndexWriterConfig config) { this.directory = directory; - this.writer = writer; - this.similarity = writer.getConfig().getSimilarity(); - this.maxThreadStates = maxThreadStates; - flushedDocCount = writer.maxDoc(); - - consumer = indexingChain.getChain(this); - if (consumer instanceof DocFieldProcessor) { - docFieldProcessor = (DocFieldProcessor) consumer; - } + this.indexWriter = indexWriter; + this.config = config; + this.maxBufferedDocs = config.getMaxBufferedDocs(); + this.threadPool = config.getIndexerThreadPool(); + this.infoStream = indexWriter.getInfoStream(); } - /** Returns true if any of the fields in the current - * buffered docs have omitTermFreqAndPositions==false */ - boolean hasProx() { - return (docFieldProcessor != null) ? docFieldProcessor.fieldInfos.hasProx() - : true; - } - - /** If non-null, various details of indexing are printed - * here. */ - synchronized void setInfoStream(PrintStream infoStream) { - this.infoStream = infoStream; - for(int i=0;i openFiles = new ArrayList(); - final List closedFiles = new ArrayList(); - - /* Returns Collection of files in use by this instance, - * including any flushed segments. */ - @SuppressWarnings("unchecked") - synchronized List openFiles() { - return (List) ((ArrayList) openFiles).clone(); - } - - @SuppressWarnings("unchecked") - synchronized List closedFiles() { - return (List) ((ArrayList) closedFiles).clone(); - } - - synchronized void addOpenFile(String name) { - assert !openFiles.contains(name); - openFiles.add(name); - } - - synchronized void removeOpenFile(String name) { - assert openFiles.contains(name); - openFiles.remove(name); - closedFiles.add(name); - } - - synchronized void setAborting() { - aborting = true; - } - - /** Called if we hit an exception at a bad time (when - * updating the index files) and must discard all - * currently buffered docs. This resets our state, - * discarding any docs added since last flush. */ - synchronized void abort() throws IOException { - - try { - if (infoStream != null) { - message("docWriter: now abort"); - } - - // Forcefully remove waiting ThreadStates from line - waitQueue.abort(); - - // Wait for all other threads to finish with - // DocumentsWriter: - pauseAllThreads(); - - try { - - assert 0 == waitQueue.numWaiting; - - waitQueue.waitingBytes = 0; - - try { - abortedFiles = openFiles(); - } catch (Throwable t) { - abortedFiles = null; - } - - deletesInRAM.clear(); - deletesFlushed.clear(); - - openFiles.clear(); - - for(int i=0;i= 0; - if (0 == pauseThreads) - notifyAll(); - } - - private synchronized boolean allThreadsIdle() { - for(int i=0;i 0; - - assert nextDocID == numDocsInRAM; - assert waitQueue.numWaiting == 0; - assert waitQueue.waitingBytes == 0; - - initFlushState(false); - - docStoreOffset = numDocsInStore; - - if (infoStream != null) - message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM); - - boolean success = false; + boolean anyChanges() { + return numDocsInRAM.get() != 0 || + deletesInRAM.hasDeletes(); + } + DocumentsWriterPerThread newDocumentsWriterPerThread() { + DocumentsWriterPerThread perThread = new DocumentsWriterPerThread(directory, this, config + .getIndexingChain()); + sequenceIDLock.lock(); try { - - if (closeDocStore) { - assert flushState.docStoreSegmentName != null; - assert flushState.docStoreSegmentName.equals(flushState.segmentName); - closeDocStore(); - flushState.numDocsInStore = 0; - } - - Collection threads = new HashSet(); - for(int i=0;i() { + @Override + public Long process(final DocumentsWriterPerThread perThread) throws IOException { + long perThreadRAMUsedBeforeAdd = perThread.numBytesUsed; + perThread.addDocument(doc, analyzer); + + final long sequenceID; + sequenceIDLock.lock(); + try { + ensureOpen(); + sequenceID = nextSequenceID(); + if (delTerm != null) { + deletesInRAM.addDeleteTerm(delTerm, sequenceID, numDocumentsWriterPerThreads); + } + perThread.commitDocument(sequenceID); + if (!minSequenceIDsPerThread.containsKey(perThread)) { + minSequenceIDsPerThread.put(perThread, sequenceID); + } + numDocsInRAM.incrementAndGet(); + } finally { + sequenceIDLock.unlock(); + } + + if (finishAddDocument(perThread, perThreadRAMUsedBeforeAdd)) { + super.clearThreadBindings(); + indexWriter.maybeMerge(); + } + return sequenceID; + } + }); + } + + private final boolean finishAddDocument(DocumentsWriterPerThread perThread, + long perThreadRAMUsedBeforeAdd) throws IOException { + int numDocsPerThread = perThread.getNumDocsInRAM(); + boolean flushed = maybeFlushPerThread(perThread); + if (flushed) { + int oldValue = numDocsInRAM.get(); + while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numDocsPerThread)) { + oldValue = numDocsInRAM.get(); + } + + sequenceIDLock.lock(); + try { + minSequenceIDsPerThread.remove(perThread); + updateFlushedSequenceID(); + } finally { + sequenceIDLock.unlock(); } } - assert waitQueue.waitingBytes == 0; + long deltaRAM = perThread.numBytesUsed - perThreadRAMUsedBeforeAdd; + long oldValue = ramUsed.get(); + while (!ramUsed.compareAndSet(oldValue, oldValue + deltaRAM)) { + oldValue = ramUsed.get(); + } - return flushState.numDocs; + return flushed; + } + + long bufferDeleteTerms(final Term[] terms) throws IOException { + sequenceIDLock.lock(); + try { + ensureOpen(); + final long sequenceID = nextSequenceID(); + deletesInRAM.addDeleteTerms(terms, sequenceID, numDocumentsWriterPerThreads); + return sequenceID; + } finally { + sequenceIDLock.unlock(); + } + } + + long bufferDeleteTerm(final Term term) throws IOException { + sequenceIDLock.lock(); + try { + ensureOpen(); + final long sequenceID = nextSequenceID(); + deletesInRAM.addDeleteTerm(term, sequenceID, numDocumentsWriterPerThreads); + return sequenceID; + } finally { + sequenceIDLock.unlock(); + } + } + + long bufferDeleteQueries(final Query[] queries) throws IOException { + sequenceIDLock.lock(); + try { + ensureOpen(); + final long sequenceID = nextSequenceID(); + for (Query q : queries) { + deletesInRAM.addDeleteQuery(q, sequenceID, numDocumentsWriterPerThreads); + } + return sequenceID; + } finally { + sequenceIDLock.unlock(); + } + } + + long bufferDeleteQuery(final Query query) throws IOException { + sequenceIDLock.lock(); + try { + ensureOpen(); + final long sequenceID = nextSequenceID(); + deletesInRAM.addDeleteQuery(query, sequenceID, numDocumentsWriterPerThreads); + return sequenceID; + } finally { + sequenceIDLock.unlock(); + } + } + + private final void updateFlushedSequenceID() { + long newFlushedID = Long.MAX_VALUE; + for (long minSeqIDPerThread : minSequenceIDsPerThread.values()) { + if (minSeqIDPerThread < newFlushedID) { + newFlushedID = minSeqIDPerThread; + } + } + + this.flushedSequenceID = newFlushedID; + } + + final boolean flushAllThreads(final boolean flushDocStores, final boolean flushDeletes) + throws IOException { + return threadPool.executeAllThreads(new DocumentsWriterThreadPool.AllThreadsTask() { + @Override + public Boolean process(Iterator threadsIterator) throws IOException { + boolean anythingFlushed = false; + + if (flushDeletes) { + synchronized (indexWriter) { + if (applyDeletes(indexWriter.segmentInfos)) { + indexWriter.checkpoint(); + } + } + } + + while (threadsIterator.hasNext()) { + boolean perThreadFlushDocStores = flushDocStores; + DocumentsWriterPerThread perThread = threadsIterator.next(); + final int numDocs = perThread.getNumDocsInRAM(); + + // Always flush docs if there are any + boolean flushDocs = numDocs > 0; + + String docStoreSegment = perThread.getDocStoreSegment(); + if (docStoreSegment == null) { + perThreadFlushDocStores = false; + } + int docStoreOffset = perThread.getDocStoreOffset(); + boolean docStoreIsCompoundFile = false; + if (perThreadFlushDocStores + && (!flushDocs || !perThread.getSegment().equals(perThread.getDocStoreSegment()))) { + // We must separately flush the doc store + if (infoStream != null) { + message(" flush shared docStore segment " + docStoreSegment); + } + docStoreIsCompoundFile = flushDocStores(perThread); + flushDocStores(perThread); + perThreadFlushDocStores = false; + } + + String segment = perThread.getSegment(); + + // If we are flushing docs, segment must not be null: + assert segment != null || !flushDocs; + + if (flushDocs) { + SegmentInfo newSegment = perThread.flush(perThreadFlushDocStores); + + if (newSegment != null) { + anythingFlushed = true; + + if (0 == docStoreOffset && perThreadFlushDocStores) { + // This means we are flushing private doc stores + // with this segment, so it will not be shared + // with other segments + assert docStoreSegment != null; + assert docStoreSegment.equals(segment); + docStoreOffset = -1; + docStoreSegment = null; + docStoreIsCompoundFile = false; + } + newSegment.setDocStore(docStoreOffset, docStoreSegment, docStoreIsCompoundFile); + + IndexWriter.setDiagnostics(newSegment, "flush"); + finishFlushedSegment(newSegment, perThread); + } + } + } + + if (anythingFlushed) { + clearThreadBindings(); + + sequenceIDLock.lock(); + try { + flushedSequenceID = sequenceID; + } finally { + sequenceIDLock.unlock(); + } + numDocsInRAM.set(0); + } + + if (flushDeletes) { + deletesInRAM.clear(); + } + + + return anythingFlushed; + } + }); } /** Build compound file for the segment we just flushed */ - void createCompoundFile(String segment) throws IOException { + void createCompoundFile(String segment, DocumentsWriterPerThread perThread) throws IOException { CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION)); - for(String fileName : flushState.flushedFiles) { + for(String fileName : perThread.flushState.flushedFiles) { cfsWriter.addFile(fileName); } @@ -680,924 +357,395 @@ final class DocumentsWriter { cfsWriter.close(); } - /** Set flushPending if it is not already set and returns - * whether it was set. This is used by IndexWriter to - * trigger a single flush even when multiple threads are - * trying to do so. */ - synchronized boolean setFlushPending() { - if (flushPending) - return false; - else { - flushPending = true; - return true; - } - } - - synchronized void clearFlushPending() { - bufferIsFull = false; - flushPending = false; - } - - synchronized void pushDeletes() { - deletesFlushed.update(deletesInRAM); - } - - synchronized void close() { - closed = true; - notifyAll(); - } - - synchronized void initSegmentName(boolean onlyDocStore) { - if (segment == null && (!onlyDocStore || docStoreSegment == null)) { - segment = writer.newSegmentName(); - assert numDocsInRAM == 0; - } - if (docStoreSegment == null) { - docStoreSegment = segment; - assert numDocsInStore == 0; - } - } - - /** Returns a free (idle) ThreadState that may be used for - * indexing this one document. This call also pauses if a - * flush is pending. If delTerm is non-null then we - * buffer this deleted term after the thread state has - * been acquired. */ - synchronized DocumentsWriterThreadState getThreadState(Document doc, Term delTerm) throws IOException { - - // First, find a thread state. If this thread already - // has affinity to a specific ThreadState, use that one - // again. - DocumentsWriterThreadState state = threadBindings.get(Thread.currentThread()); - if (state == null) { - - // First time this thread has called us since last - // flush. Find the least loaded thread state: - DocumentsWriterThreadState minThreadState = null; - for(int i=0;i= maxThreadStates)) { - state = minThreadState; - state.numThreads++; - } else { - // Just create a new "private" thread state - DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length]; - if (threadStates.length > 0) - System.arraycopy(threadStates, 0, newArray, 0, threadStates.length); - state = newArray[threadStates.length] = new DocumentsWriterThreadState(this); - threadStates = newArray; - } - threadBindings.put(Thread.currentThread(), state); - } - - // Next, wait until my thread state is idle (in case - // it's shared with other threads) and for threads to - // not be paused nor a flush pending: - waitReady(state); - - // Allocate segment name if this is the first doc since - // last flush: - initSegmentName(false); - - state.isIdle = false; - - boolean success = false; - try { - state.docState.docID = nextDocID; - - assert writer.testPoint("DocumentsWriter.ThreadState.init start"); - - if (delTerm != null) { - addDeleteTerm(delTerm, state.docState.docID); - state.doFlushAfter = timeToFlushDeletes(); - } - - assert writer.testPoint("DocumentsWriter.ThreadState.init after delTerm"); - - nextDocID++; - numDocsInRAM++; - - // We must at this point commit to flushing to ensure we - // always get N docs when we flush by doc count, even if - // > 1 thread is adding documents: - if (!flushPending && - maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH - && numDocsInRAM >= maxBufferedDocs) { - flushPending = true; - state.doFlushAfter = true; - } - - success = true; - } finally { - if (!success) { - // Forcefully idle this ThreadState: - state.isIdle = true; - notifyAll(); - if (state.doFlushAfter) { - state.doFlushAfter = false; - flushPending = false; - } - } - } - - return state; - } - - /** Returns true if the caller (IndexWriter) should now - * flush. */ - boolean addDocument(Document doc, Analyzer analyzer) - throws CorruptIndexException, IOException { - return updateDocument(doc, analyzer, null); - } - - boolean updateDocument(Term t, Document doc, Analyzer analyzer) - throws CorruptIndexException, IOException { - return updateDocument(doc, analyzer, t); - } - - boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm) - throws CorruptIndexException, IOException { - - // This call is synchronized but fast - final DocumentsWriterThreadState state = getThreadState(doc, delTerm); - - final DocState docState = state.docState; - docState.doc = doc; - docState.analyzer = analyzer; - - boolean success = false; - try { - // This call is not synchronized and does all the - // work - final DocWriter perDoc; + // nocommit + void finishFlushedSegment(SegmentInfo newSegment, DocumentsWriterPerThread perThread) throws IOException { + synchronized(indexWriter) { + indexWriter.segmentInfos.add(newSegment); + indexWriter.checkpoint(); + SegmentReader reader = indexWriter.readerPool.get(newSegment, false); + boolean any = false; try { - perDoc = state.consumer.processDocument(); + any = applyDeletes(reader, newSegment.getMinSequenceID(), newSegment.getMaxSequenceID(), perThread.sequenceIDs); } finally { - docState.clear(); + indexWriter.readerPool.release(reader); } - - // This call is synchronized but fast - finishDocument(state, perDoc); - - success = true; - } finally { - if (!success) { - synchronized(this) { - - if (aborting) { - state.isIdle = true; - notifyAll(); - abort(); - } else { - skipDocWriter.docID = docState.docID; - boolean success2 = false; - try { - waitQueue.add(skipDocWriter); - success2 = true; - } finally { - if (!success2) { - state.isIdle = true; - notifyAll(); - abort(); - return false; - } + if (any) { + indexWriter.checkpoint(); + } + + if (indexWriter.mergePolicy.useCompoundFile(indexWriter.segmentInfos, newSegment)) { + // Now build compound file + boolean success = false; + try { + createCompoundFile(newSegment.name, perThread); + success = true; + } finally { + if (!success) { + if (infoStream != null) { + message("hit exception " + + "reating compound file for newly flushed segment " + newSegment.name); } - - state.isIdle = true; - notifyAll(); - - // If this thread state had decided to flush, we - // must clear it so another thread can flush - if (state.doFlushAfter) { - state.doFlushAfter = false; - flushPending = false; - notifyAll(); - } - - // Immediately mark this document as deleted - // since likely it was partially added. This - // keeps indexing as "all or none" (atomic) when - // adding a document: - addDeleteDocID(state.docState.docID); + indexWriter.deleter.deleteFile(IndexFileNames.segmentFileName(newSegment.name, "", + IndexFileNames.COMPOUND_FILE_EXTENSION)); } } + + newSegment.setUseCompoundFile(true); + indexWriter.checkpoint(); } } - - return state.doFlushAfter || timeToFlushDeletes(); } - // for testing - synchronized int getNumBufferedDeleteTerms() { - return deletesInRAM.numTerms; - } - - // for testing - synchronized Map getBufferedDeleteTerms() { - return deletesInRAM.terms; - } - - /** Called whenever a merge has completed and the merged segments had deletions */ - synchronized void remapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount) { - if (docMaps == null) - // The merged segments had no deletes so docIDs did not change and we have nothing to do - return; - MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount); - deletesInRAM.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); - deletesFlushed.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); - flushedDocCount -= mapper.docShift; - } - - synchronized private void waitReady(DocumentsWriterThreadState state) { - - while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting)) { + + private boolean flushDocStores(DocumentsWriterPerThread perThread) throws IOException { + boolean useCompoundDocStore = false; + + String docStoreSegment; + + boolean success = false; try { - wait(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } - - if (closed) - throw new AlreadyClosedException("this IndexWriter is closed"); - } - - boolean bufferDeleteTerms(Term[] terms) throws IOException { - synchronized(this) { - waitReady(null); - for (int i = 0; i < terms.length; i++) - addDeleteTerm(terms[i], numDocsInRAM); - } - return timeToFlushDeletes(); - } - - boolean bufferDeleteTerm(Term term) throws IOException { - synchronized(this) { - waitReady(null); - addDeleteTerm(term, numDocsInRAM); - } - return timeToFlushDeletes(); - } - - boolean bufferDeleteQueries(Query[] queries) throws IOException { - synchronized(this) { - waitReady(null); - for (int i = 0; i < queries.length; i++) - addDeleteQuery(queries[i], numDocsInRAM); - } - return timeToFlushDeletes(); - } - - boolean bufferDeleteQuery(Query query) throws IOException { - synchronized(this) { - waitReady(null); - addDeleteQuery(query, numDocsInRAM); - } - return timeToFlushDeletes(); - } - - synchronized boolean deletesFull() { - return (ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH && - (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize) || - (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH && - ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms)); - } - - synchronized boolean doApplyDeletes() { - // Very similar to deletesFull(), except we don't count - // numBytesUsed, because we are checking whether - // deletes (alone) are consuming too many resources now - // and thus should be applied. We apply deletes if RAM - // usage is > 1/2 of our allowed RAM buffer, to prevent - // too-frequent flushing of a long tail of tiny segments - // when merges (which always apply deletes) are - // infrequent. - return (ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH && - (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed) >= ramBufferSize/2) || - (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH && - ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms)); - } - - private boolean timeToFlushDeletes() { - balanceRAM(); - synchronized(this) { - return (bufferIsFull || deletesFull()) && setFlushPending(); - } - } - - void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { - this.maxBufferedDeleteTerms = maxBufferedDeleteTerms; - } - - int getMaxBufferedDeleteTerms() { - return maxBufferedDeleteTerms; - } - - synchronized boolean hasDeletes() { - return deletesFlushed.any(); - } - - synchronized boolean applyDeletes(SegmentInfos infos) throws IOException { - - if (!hasDeletes()) - return false; - - final long t0 = System.currentTimeMillis(); - - if (infoStream != null) - message("apply " + deletesFlushed.numTerms + " buffered deleted terms and " + - deletesFlushed.docIDs.size() + " deleted docIDs and " + - deletesFlushed.queries.size() + " deleted queries on " + - + infos.size() + " segments."); - - final int infosEnd = infos.size(); - - int docStart = 0; - boolean any = false; - for (int i = 0; i < infosEnd; i++) { - - // Make sure we never attempt to apply deletes to - // segment in external dir - assert infos.info(i).dir == directory; - - SegmentReader reader = writer.readerPool.get(infos.info(i), false); - try { - any |= applyDeletes(reader, docStart); - docStart += reader.maxDoc(); + docStoreSegment = perThread.closeDocStore(); + success = true; } finally { - writer.readerPool.release(reader); + if (!success && infoStream != null) { + message("hit exception closing doc store segment"); + } } - } - - deletesFlushed.clear(); - if (infoStream != null) { - message("apply deletes took " + (System.currentTimeMillis()-t0) + " msec"); - } - - return any; + + useCompoundDocStore = indexWriter.mergePolicy.useCompoundDocStore(indexWriter.segmentInfos); + + if (useCompoundDocStore && docStoreSegment != null && perThread.closedFiles().size() != 0) { + // Now build compound doc store file + + if (infoStream != null) { + message("create compound file " + + IndexFileNames.segmentFileName(docStoreSegment, "", + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION)); + } + + success = false; + + final int numSegments = indexWriter.segmentInfos.size(); + final String compoundFileName = IndexFileNames.segmentFileName(docStoreSegment, "", + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION); + + try { + CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName); + for (final String file : perThread.closedFiles()) { + cfsWriter.addFile(file); + } + + // Perform the merge + cfsWriter.close(); + success = true; + + } finally { + if (!success) { + if (infoStream != null) + message("hit exception building compound file doc store for segment " + docStoreSegment); + synchronized(indexWriter) { + indexWriter.deleter.deleteFile(compoundFileName); + } + abort(); + } + } + + synchronized(indexWriter) { + for (int i = 0; i < numSegments; i++) { + SegmentInfo si = indexWriter.segmentInfos.info(i); + if (si.getDocStoreOffset() != -1 && + si.getDocStoreSegment().equals(docStoreSegment)) + si.setDocStoreIsCompoundFile(true); + } + + indexWriter.checkpoint(); + + // In case the files we just merged into a CFS were + // not previously checkpointed: + indexWriter.deleter.deleteNewFiles(perThread.closedFiles()); + } + } + + return useCompoundDocStore; + + } + + // Returns true if an abort is in progress + void pauseAllThreads() { + threadPool.pauseAllThreads(); } - // used only by assert - private Term lastDeleteTerm; + void resumeAllThreads() { + threadPool.resumeAllThreads(); + } - // used only by assert - private boolean checkDeleteTerm(Term term) { - if (term != null) { - assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term; + void close() { + sequenceIDLock.lock(); + try { + closed = true; + } finally { + sequenceIDLock.unlock(); + } + } + + private void ensureOpen() throws AlreadyClosedException { + if (closed) { + throw new AlreadyClosedException("this IndexWriter is closed"); + } + } + + private final boolean maybeFlushPerThread(DocumentsWriterPerThread perThread) throws IOException { + if (perThread.getNumDocsInRAM() == maxBufferedDocs) { + flushSegment(perThread, false); + assert perThread.getNumDocsInRAM() == 0; + return true; + } + + return false; + } + + private boolean flushSegment(DocumentsWriterPerThread perThread, boolean flushDocStores) + throws IOException { + if (perThread.getNumDocsInRAM() == 0 && !flushDocStores) { + return false; + } + + int docStoreOffset = perThread.getDocStoreOffset(); + String docStoreSegment = perThread.getDocStoreSegment(); + SegmentInfo newSegment = perThread.flush(flushDocStores); + + if (newSegment != null) { + newSegment.setDocStore(docStoreOffset, docStoreSegment, false); + finishFlushedSegment(newSegment, perThread); + return true; + } + return false; + } + + void abort() throws IOException { + threadPool.abort(); + try { + try { + abortedFiles = openFiles(); + } catch (Throwable t) { + abortedFiles = null; + } + + deletesInRAM.clear(); + // nocommit + // deletesFlushed.clear(); + + openFiles.clear(); + } finally { + threadPool.finishAbort(); + } + + } + + final List openFiles = new ArrayList(); + private Collection abortedFiles; // List of files that were written before last abort() + + /* + * Returns Collection of files in use by this instance, + * including any flushed segments. + */ + @SuppressWarnings("unchecked") + List openFiles() { + synchronized(openFiles) { + return (List) ((ArrayList) openFiles).clone(); + } + } + + + Collection abortedFiles() { + return abortedFiles; + } + + boolean hasDeletes() { + return deletesInRAM.hasDeletes(); + } + + // nocommit + int getNumDocsInRAM() { + return numDocsInRAM.get(); + } + + // nocommit + long getRAMUsed() { + return ramUsed.get(); + } + + // nocommit + // long getRAMUsed() { + // return numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed; + // } + + boolean applyDeletes(SegmentInfos infos) throws IOException { + synchronized(indexWriter) { + if (!hasDeletes()) + return false; + + final long t0 = System.currentTimeMillis(); + + if (infoStream != null) { + message("apply " + deletesInRAM.getNumDeletes() + " buffered deletes on " + + +infos.size() + " segments."); + } + + final int infosEnd = infos.size(); + + boolean any = false; + for (int i = 0; i < infosEnd; i++) { + + // Make sure we never attempt to apply deletes to + // segment in external dir + assert infos.info(i).dir == directory; + + SegmentInfo si = infos.info(i); + SegmentReader reader = indexWriter.readerPool.get(si, false); + try { + any |= applyDeletes(reader, si.getMinSequenceID(), si.getMaxSequenceID(), null); + } finally { + indexWriter.readerPool.release(reader); + } + } + + if (infoStream != null) { + message("apply deletes took " + (System.currentTimeMillis() - t0) + " msec"); + } + + return any; } - lastDeleteTerm = term; - return true; } // Apply buffered delete terms, queries and docIDs to the // provided reader - private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart) - throws CorruptIndexException, IOException { + final boolean applyDeletes(IndexReader reader, long minSequenceID, long maxSequenceID, long[] sequenceIDs) + throws CorruptIndexException, IOException { + + assert sequenceIDs == null || sequenceIDs.length >= reader.maxDoc() : "reader.maxDoc=" + + reader.maxDoc() + ",sequenceIDs.length=" + sequenceIDs.length; - final int docEnd = docIDStart + reader.maxDoc(); boolean any = false; - assert checkDeleteTerm(null); - - // Delete by term - if (deletesFlushed.terms.size() > 0) { - Fields fields = reader.fields(); - if (fields == null) { - // This reader has no postings - return false; - } - - TermsEnum termsEnum = null; - - String currentField = null; - DocsEnum docs = null; - - for (Entry entry: deletesFlushed.terms.entrySet()) { - Term term = entry.getKey(); - // Since we visit terms sorted, we gain performance - // by re-using the same TermsEnum and seeking only - // forwards - if (term.field() != currentField) { - assert currentField == null || currentField.compareTo(term.field()) < 0; - currentField = term.field(); - Terms terms = fields.terms(currentField); - if (terms != null) { - termsEnum = terms.iterator(); - } else { - termsEnum = null; - } - } - - if (termsEnum == null) { - continue; - } - assert checkDeleteTerm(term); - - if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) { - DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs); - - if (docsEnum != null) { - docs = docsEnum; - int limit = entry.getValue().getNum(); - while (true) { - final int docID = docs.nextDoc(); - if (docID == DocsEnum.NO_MORE_DOCS || docIDStart+docID >= limit) { - break; - } - reader.deleteDocument(docID); - any = true; - } - } + // first: delete the documents that had non-aborting exceptions + if (sequenceIDs != null) { + for (int i = 0; i < reader.maxDoc(); i++) { + if (sequenceIDs[i] == -1) { + reader.deleteDocument(i); + any = true; } } } - - // Delete by docID - for (Integer docIdInt : deletesFlushed.docIDs) { - int docID = docIdInt.intValue(); - if (docID >= docIDStart && docID < docEnd) { - reader.deleteDocument(docID-docIDStart); - any = true; - } - } - - // Delete by query - if (deletesFlushed.queries.size() > 0) { + + if (deletesInRAM.hasDeletes()) { IndexSearcher searcher = new IndexSearcher(reader); - try { - for (Entry entry : deletesFlushed.queries.entrySet()) { - Query query = entry.getKey(); - int limit = entry.getValue().intValue(); - Weight weight = query.weight(searcher); - Scorer scorer = weight.scorer(reader, true, false); - if (scorer != null) { - while(true) { - int doc = scorer.nextDoc(); - if (((long) docIDStart) + doc >= limit) - break; - reader.deleteDocument(doc); - any = true; + + SortedMap deletes = deletesInRAM.deletes.getReadCopy(); + + SortedMap deleteTerms = new TreeMap(); + for (Entry entry : deletes.entrySet()) { + if (minSequenceID < entry.getKey()) { + BufferedDeletesInRAM.Delete delete = entry.getValue(); + if (delete instanceof BufferedDeletesInRAM.DeleteTerm) { + BufferedDeletesInRAM.DeleteTerm deleteTerm = (BufferedDeletesInRAM.DeleteTerm) delete; + deleteTerms.put(deleteTerm.term, entry.getKey()); + } else if (delete instanceof BufferedDeletesInRAM.DeleteTerms) { + BufferedDeletesInRAM.DeleteTerms terms = (BufferedDeletesInRAM.DeleteTerms) delete; + for (Term t : terms.terms) { + deleteTerms.put(t, entry.getKey()); + } + } else { + // delete query + BufferedDeletesInRAM.DeleteQuery deleteQuery = (BufferedDeletesInRAM.DeleteQuery) delete; + Query query = deleteQuery.query; + Weight weight = query.weight(searcher); + Scorer scorer = weight.scorer(reader, true, false); + if (scorer != null) { + while (true) { + int doc = scorer.nextDoc(); + if (doc == DocsEnum.NO_MORE_DOCS) { + break; + } + if ( (sequenceIDs != null && sequenceIDs[doc] < entry.getKey()) + || (sequenceIDs == null && maxSequenceID < entry.getKey())) { + reader.deleteDocument(doc); + any = true; + } + } + } + } + } + } + + // Delete by term + if (deleteTerms.size() > 0) { + Fields fields = reader.fields(); + if (fields == null) { + // This reader has no postings + return false; + } + + TermsEnum termsEnum = null; + + String currentField = null; + BytesRef termRef = new BytesRef(); + DocsEnum docs = null; + + for (Entry entry : deleteTerms.entrySet()) { + Term term = entry.getKey(); + // Since we visit terms sorted, we gain performance + // by re-using the same TermsEnum and seeking only + // forwards + if (term.field() != currentField) { + assert currentField == null || currentField.compareTo(term.field()) < 0; + currentField = term.field(); + Terms terms = fields.terms(currentField); + if (terms != null) { + termsEnum = terms.iterator(); + } else { + termsEnum = null; + } + } + + if (termsEnum == null) { + continue; + } + // assert checkDeleteTerm(term); + + termRef.copy(term.text()); + + if (termsEnum.seek(termRef, false) == TermsEnum.SeekStatus.FOUND) { + DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs); + + if (docsEnum != null) { + docs = docsEnum; + // int limit = entry.getValue().getNum(); + while (true) { + final int doc = docs.nextDoc(); + // if (docID == DocsEnum.NO_MORE_DOCS || docIDStart+docID >= limit) { + if (doc == DocsEnum.NO_MORE_DOCS) { + break; + } + if ( (sequenceIDs != null && sequenceIDs[doc] < entry.getValue()) + || (sequenceIDs == null && maxSequenceID < entry.getValue())) { + reader.deleteDocument(doc); + any = true; + } + } } } } - } finally { - searcher.close(); } } + return any; } - // Buffer a term in bufferedDeleteTerms, which records the - // current number of documents buffered in ram so that the - // delete term will be applied to those documents as well - // as the disk segments. - synchronized private void addDeleteTerm(Term term, int docCount) { - BufferedDeletes.Num num = deletesInRAM.terms.get(term); - final int docIDUpto = flushedDocCount + docCount; - if (num == null) - deletesInRAM.terms.put(term, new BufferedDeletes.Num(docIDUpto)); - else - num.setNum(docIDUpto); - deletesInRAM.numTerms++; - - deletesInRAM.addBytesUsed(BYTES_PER_DEL_TERM + term.bytes.length); - } - - // Buffer a specific docID for deletion. Currently only - // used when we hit a exception when adding a document - synchronized private void addDeleteDocID(int docID) { - deletesInRAM.docIDs.add(Integer.valueOf(flushedDocCount+docID)); - deletesInRAM.addBytesUsed(BYTES_PER_DEL_DOCID); - } - - synchronized private void addDeleteQuery(Query query, int docID) { - deletesInRAM.queries.put(query, Integer.valueOf(flushedDocCount + docID)); - deletesInRAM.addBytesUsed(BYTES_PER_DEL_QUERY); - } - - /** Does the synchronized work to finish/flush the - * inverted document. */ - private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException { - - // Must call this w/o holding synchronized(this) else - // we'll hit deadlock: - balanceRAM(); - - synchronized(this) { - - assert docWriter == null || docWriter.docID == perThread.docState.docID; - - if (aborting) { - - // We are currently aborting, and another thread is - // waiting for me to become idle. We just forcefully - // idle this threadState; it will be fully reset by - // abort() - if (docWriter != null) - try { - docWriter.abort(); - } catch (Throwable t) { - } - - perThread.isIdle = true; - notifyAll(); - return; - } - - final boolean doPause; - - if (docWriter != null) - doPause = waitQueue.add(docWriter); - else { - skipDocWriter.docID = perThread.docState.docID; - doPause = waitQueue.add(skipDocWriter); - } - - if (doPause) - waitForWaitQueue(); - - if (bufferIsFull && !flushPending) { - flushPending = true; - perThread.doFlushAfter = true; - } - - perThread.isIdle = true; - notifyAll(); + void message(String message) { + if (infoStream != null) { + indexWriter.message("DW: " + message); } } - synchronized void waitForWaitQueue() { - do { - try { - wait(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } while (!waitQueue.doResume()); - } - - private static class SkipDocWriter extends DocWriter { - @Override - void finish() { - } - @Override - void abort() { - } - @Override - long sizeInBytes() { - return 0; - } - } - final SkipDocWriter skipDocWriter = new SkipDocWriter(); - - long getRAMUsed() { - return numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed; - } - - long numBytesUsed; - - NumberFormat nf = NumberFormat.getInstance(); - - // Coarse estimates used to measure RAM usage of buffered deletes - final static int OBJECT_HEADER_BYTES = 8; - final static int POINTER_NUM_BYTE = Constants.JRE_IS_64BIT ? 8 : 4; - final static int INT_NUM_BYTE = 4; - final static int CHAR_NUM_BYTE = 2; - - /* Rough logic: HashMap has an array[Entry] w/ varying - load factor (say 2 * POINTER). Entry is object w/ Term - key, BufferedDeletes.Num val, int hash, Entry next - (OBJ_HEADER + 3*POINTER + INT). Term is object w/ - String field and String text (OBJ_HEADER + 2*POINTER). - We don't count Term's field since it's interned. - Term's text is String (OBJ_HEADER + 4*INT + POINTER + - OBJ_HEADER + string.length*CHAR). BufferedDeletes.num is - OBJ_HEADER + INT. */ - - final static int BYTES_PER_DEL_TERM = 8*POINTER_NUM_BYTE + 5*OBJECT_HEADER_BYTES + 6*INT_NUM_BYTE; - - /* Rough logic: del docIDs are List. Say list - allocates ~2X size (2*POINTER). Integer is OBJ_HEADER - + int */ - final static int BYTES_PER_DEL_DOCID = 2*POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE; - - /* Rough logic: HashMap has an array[Entry] w/ varying - load factor (say 2 * POINTER). Entry is object w/ - Query key, Integer val, int hash, Entry next - (OBJ_HEADER + 3*POINTER + INT). Query we often - undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */ - final static int BYTES_PER_DEL_QUERY = 5*POINTER_NUM_BYTE + 2*OBJECT_HEADER_BYTES + 2*INT_NUM_BYTE + 24; - - /* Initial chunks size of the shared byte[] blocks used to - store postings data */ - final static int BYTE_BLOCK_SHIFT = 15; - final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT; - final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1; - final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK; - - /* if you increase this, you must fix field cache impl for - * getTerms/getTermsIndex requires <= 32768 */ - final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2; - - private class ByteBlockAllocator extends ByteBlockPool.Allocator { - final int blockSize; - - ByteBlockAllocator(int blockSize) { - this.blockSize = blockSize; - } - - ArrayList freeByteBlocks = new ArrayList(); - - /* Allocate another byte[] from the shared pool */ - @Override - byte[] getByteBlock() { - synchronized(DocumentsWriter.this) { - final int size = freeByteBlocks.size(); - final byte[] b; - if (0 == size) { - b = new byte[blockSize]; - numBytesUsed += blockSize; - } else - b = freeByteBlocks.remove(size-1); - return b; - } - } - - /* Return byte[]'s to the pool */ - - @Override - void recycleByteBlocks(byte[][] blocks, int start, int end) { - synchronized(DocumentsWriter.this) { - for(int i=start;i blocks) { - synchronized(DocumentsWriter.this) { - final int size = blocks.size(); - for(int i=0;i freeIntBlocks = new ArrayList(); - - /* Allocate another int[] from the shared pool */ - synchronized int[] getIntBlock() { - final int size = freeIntBlocks.size(); - final int[] b; - if (0 == size) { - b = new int[INT_BLOCK_SIZE]; - numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE; - } else - b = freeIntBlocks.remove(size-1); - return b; - } - - synchronized void bytesUsed(long numBytes) { - numBytesUsed += numBytes; - } - - /* Return int[]s to the pool */ - synchronized void recycleIntBlocks(int[][] blocks, int start, int end) { - for(int i=start;i= ramBufferSize; - } - - if (doBalance) { - - if (infoStream != null) - message(" RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) + - " vs trigger=" + toMB(ramBufferSize) + - " deletesMB=" + toMB(deletesRAMUsed) + - " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) + - " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE)); - - final long startBytesUsed = numBytesUsed + deletesRAMUsed; - - int iter = 0; - - // We free equally from each pool in 32 KB - // chunks until we are below our threshold - // (freeLevel) - - boolean any = true; - - while(numBytesUsed+deletesRAMUsed > freeLevel) { - - synchronized(this) { - if (0 == perDocAllocator.freeByteBlocks.size() && - 0 == byteBlockAllocator.freeByteBlocks.size() && - 0 == freeIntBlocks.size() && !any) { - // Nothing else to free -- must flush now. - bufferIsFull = numBytesUsed+deletesRAMUsed > ramBufferSize; - if (infoStream != null) { - if (numBytesUsed+deletesRAMUsed > ramBufferSize) - message(" nothing to free; now set bufferIsFull"); - else - message(" nothing to free"); - } - break; - } - - if ((0 == iter % 4) && byteBlockAllocator.freeByteBlocks.size() > 0) { - byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1); - numBytesUsed -= BYTE_BLOCK_SIZE; - } - - if ((1 == iter % 4) && freeIntBlocks.size() > 0) { - freeIntBlocks.remove(freeIntBlocks.size()-1); - numBytesUsed -= INT_BLOCK_SIZE * INT_NUM_BYTE; - } - - if ((2 == iter % 4) && perDocAllocator.freeByteBlocks.size() > 0) { - // Remove upwards of 32 blocks (each block is 1K) - for (int i = 0; i < 32; ++i) { - perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1); - numBytesUsed -= PER_DOC_BLOCK_SIZE; - if (perDocAllocator.freeByteBlocks.size() == 0) { - break; - } - } - } - } - - if ((3 == iter % 4) && any) - // Ask consumer to free any recycled state - any = consumer.freeRAM(); - - iter++; - } - - if (infoStream != null) - message(" after free: freedMB=" + nf.format((startBytesUsed-numBytesUsed-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((numBytesUsed+deletesRAMUsed)/1024./1024.)); - } - } - - final WaitQueue waitQueue = new WaitQueue(); - - private class WaitQueue { - DocWriter[] waiting; - int nextWriteDocID; - int nextWriteLoc; - int numWaiting; - long waitingBytes; - - public WaitQueue() { - waiting = new DocWriter[10]; - } - - synchronized void reset() { - // NOTE: nextWriteLoc doesn't need to be reset - assert numWaiting == 0; - assert waitingBytes == 0; - nextWriteDocID = 0; - } - - synchronized boolean doResume() { - return waitingBytes <= waitQueueResumeBytes; - } - - synchronized boolean doPause() { - return waitingBytes > waitQueuePauseBytes; - } - - synchronized void abort() { - int count = 0; - for(int i=0;i= nextWriteDocID; - - if (doc.docID == nextWriteDocID) { - writeDocument(doc); - while(true) { - doc = waiting[nextWriteLoc]; - if (doc != null) { - numWaiting--; - waiting[nextWriteLoc] = null; - waitingBytes -= doc.sizeInBytes(); - writeDocument(doc); - } else - break; - } - } else { - - // I finished before documents that were added - // before me. This can easily happen when I am a - // small doc and the docs before me were large, or, - // just due to luck in the thread scheduling. Just - // add myself to the queue and when that large doc - // finishes, it will flush me: - int gap = doc.docID - nextWriteDocID; - if (gap >= waiting.length) { - // Grow queue - DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)]; - assert nextWriteLoc >= 0; - System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc); - System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc); - nextWriteLoc = 0; - waiting = newArray; - gap = doc.docID - nextWriteDocID; - } - - int loc = nextWriteLoc + gap; - if (loc >= waiting.length) - loc -= waiting.length; - - // We should only wrap one time - assert loc < waiting.length; - - // Nobody should be in my spot! - assert waiting[loc] == null; - waiting[loc] = doc; - numWaiting++; - waitingBytes += doc.sizeInBytes(); - } - - return doPause(); - } - } } diff --git a/lucene/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/src/java/org/apache/lucene/index/IndexWriter.java index 563446499e2..2f53492e617 100644 --- a/lucene/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/src/java/org/apache/lucene/index/IndexWriter.java @@ -240,7 +240,7 @@ public class IndexWriter implements Closeable { * printed to infoStream, if set (see {@link * #setInfoStream}). */ - public final static int MAX_TERM_LENGTH = DocumentsWriter.MAX_TERM_LENGTH_UTF8; + public final static int MAX_TERM_LENGTH = DocumentsWriterRAMAllocator.MAX_TERM_LENGTH_UTF8; // The normal read buffer size defaults to 1024, but // increasing this during merging seems to yield @@ -271,10 +271,12 @@ public class IndexWriter implements Closeable { volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit()) volatile long pendingCommitChangeCount; - private SegmentInfos segmentInfos = new SegmentInfos(); // the segments + // nocommit - private + SegmentInfos segmentInfos = new SegmentInfos(); // the segments private DocumentsWriter docWriter; - private IndexFileDeleter deleter; + //nocommit - private + IndexFileDeleter deleter; private Set segmentsToOptimize = new HashSet(); // used by optimize to note those needing optimization @@ -289,8 +291,8 @@ public class IndexWriter implements Closeable { // Holds all SegmentInfo instances currently involved in // merges private HashSet mergingSegments = new HashSet(); - - private MergePolicy mergePolicy; + // nocommit - private + MergePolicy mergePolicy; // TODO 4.0: this should be made final once the setter is removed private /*final*/MergeScheduler mergeScheduler; private LinkedList pendingMerges = new LinkedList(); @@ -733,113 +735,6 @@ public class IndexWriter implements Closeable { throw new IllegalArgumentException("this method can only be called when the merge policy is the default LogMergePolicy"); } - /**

Get the current setting of whether newly flushed - * segments will use the compound file format. Note that - * this just returns the value previously set with - * setUseCompoundFile(boolean), or the default value - * (true). You cannot use this to query the status of - * previously flushed segments.

- * - *

Note that this method is a convenience method: it - * just calls mergePolicy.getUseCompoundFile as long as - * mergePolicy is an instance of {@link LogMergePolicy}. - * Otherwise an IllegalArgumentException is thrown.

- * - * @see #setUseCompoundFile(boolean) - * @deprecated use {@link LogMergePolicy#getUseCompoundDocStore()} and - * {@link LogMergePolicy#getUseCompoundFile()} directly. - */ - public boolean getUseCompoundFile() { - return getLogMergePolicy().getUseCompoundFile(); - } - - /** - *

- * Setting to turn on usage of a compound file. When on, multiple files for - * each segment are merged into a single file when a new segment is flushed. - *

- * - *

- * Note that this method is a convenience method: it just calls - * mergePolicy.setUseCompoundFile as long as mergePolicy is an instance of - * {@link LogMergePolicy}. Otherwise an IllegalArgumentException is thrown. - *

- * - * @deprecated use {@link LogMergePolicy#setUseCompoundDocStore(boolean)} and - * {@link LogMergePolicy#setUseCompoundFile(boolean)} directly. - * Note that this method set the given value on both, therefore - * you should consider doing the same. - */ - public void setUseCompoundFile(boolean value) { - getLogMergePolicy().setUseCompoundFile(value); - getLogMergePolicy().setUseCompoundDocStore(value); - } - - /** Expert: Set the Similarity implementation used by this IndexWriter. - * - * @see Similarity#setDefault(Similarity) - * @deprecated use {@link IndexWriterConfig#setSimilarity(Similarity)} instead - */ - public void setSimilarity(Similarity similarity) { - ensureOpen(); - this.similarity = similarity; - docWriter.setSimilarity(similarity); - // Required so config.getSimilarity returns the right value. But this will - // go away together with the method in 4.0. - config.setSimilarity(similarity); - } - - /** Expert: Return the Similarity implementation used by this IndexWriter. - * - *

This defaults to the current value of {@link Similarity#getDefault()}. - * @deprecated use {@link IndexWriterConfig#getSimilarity()} instead - */ - public Similarity getSimilarity() { - ensureOpen(); - return similarity; - } - - /** Expert: Set the interval between indexed terms. Large values cause less - * memory to be used by IndexReader, but slow random-access to terms. Small - * values cause more memory to be used by an IndexReader, and speed - * random-access to terms. - * - * This parameter determines the amount of computation required per query - * term, regardless of the number of documents that contain that term. In - * particular, it is the maximum number of other terms that must be - * scanned before a term is located and its frequency and position information - * may be processed. In a large index with user-entered query terms, query - * processing time is likely to be dominated not by term lookup but rather - * by the processing of frequency and positional data. In a small index - * or when many uncommon query terms are generated (e.g., by wildcard - * queries) term lookup may become a dominant cost. - * - * In particular, numUniqueTerms/interval terms are read into - * memory by an IndexReader, and, on average, interval/2 terms - * must be scanned for each random term access. - * - * @see #DEFAULT_TERM_INDEX_INTERVAL - * @deprecated use {@link IndexWriterConfig#setTermIndexInterval(int)} - */ - public void setTermIndexInterval(int interval) { - ensureOpen(); - this.termIndexInterval = interval; - // Required so config.getTermIndexInterval returns the right value. But this - // will go away together with the method in 4.0. - config.setTermIndexInterval(interval); - } - - /** Expert: Return the interval between indexed terms. - * - * @see #setTermIndexInterval(int) - * @deprecated use {@link IndexWriterConfig#getTermIndexInterval()} - */ - public int getTermIndexInterval() { - // We pass false because this method is called by SegmentMerger while we are in the process of closing - ensureOpen(false); - return termIndexInterval; - } - /** * Constructs an IndexWriter for the index in d. * Text will be analyzed with a. If create @@ -1028,7 +923,6 @@ public class IndexWriter implements Closeable { directory = d; analyzer = conf.getAnalyzer(); setMessageID(defaultInfoStream); - maxFieldLength = conf.getMaxFieldLength(); termIndexInterval = conf.getTermIndexInterval(); writeLockTimeout = conf.getWriteLockTimeout(); similarity = conf.getSimilarity(); @@ -1102,9 +996,10 @@ public class IndexWriter implements Closeable { setRollbackSegmentInfos(segmentInfos); - docWriter = new DocumentsWriter(directory, this, conf.getIndexingChain(), conf.getMaxThreadStates()); - docWriter.setInfoStream(infoStream); - docWriter.setMaxFieldLength(maxFieldLength); + docWriter = new DocumentsWriter(directory, this, conf); + // nocommit + //docWriter.setInfoStream(infoStream); + //docWriter.setMaxFieldLength(maxFieldLength); // Default deleter (for backwards compatibility) is // KeepOnlyLastCommitDeleter: @@ -1167,149 +1062,6 @@ public class IndexWriter implements Closeable { return config; } - /** - * Expert: set the merge policy used by this writer. - * - * @deprecated use {@link IndexWriterConfig#setMergePolicy(MergePolicy)} instead. - */ - public void setMergePolicy(MergePolicy mp) { - ensureOpen(); - if (mp == null) - throw new NullPointerException("MergePolicy must be non-null"); - - if (mergePolicy != mp) - mergePolicy.close(); - mergePolicy = mp; - mergePolicy.setIndexWriter(this); - pushMaxBufferedDocs(); - if (infoStream != null) - message("setMergePolicy " + mp); - // Required so config.getMergePolicy returns the right value. But this will - // go away together with the method in 4.0. - config.setMergePolicy(mp); - } - - /** - * Expert: returns the current MergePolicy in use by this writer. - * @see #setMergePolicy - * - * @deprecated use {@link IndexWriterConfig#getMergePolicy()} instead - */ - public MergePolicy getMergePolicy() { - ensureOpen(); - return mergePolicy; - } - - /** - * Expert: set the merge scheduler used by this writer. - * @deprecated use {@link IndexWriterConfig#setMergeScheduler(MergeScheduler)} instead - */ - synchronized public void setMergeScheduler(MergeScheduler mergeScheduler) throws CorruptIndexException, IOException { - ensureOpen(); - if (mergeScheduler == null) - throw new NullPointerException("MergeScheduler must be non-null"); - - if (this.mergeScheduler != mergeScheduler) { - finishMerges(true); - this.mergeScheduler.close(); - } - this.mergeScheduler = mergeScheduler; - if (infoStream != null) - message("setMergeScheduler " + mergeScheduler); - // Required so config.getMergeScheduler returns the right value. But this will - // go away together with the method in 4.0. - config.setMergeScheduler(mergeScheduler); - } - - /** - * Expert: returns the current MergeScheduler in use by this - * writer. - * @see #setMergeScheduler(MergeScheduler) - * @deprecated use {@link IndexWriterConfig#getMergeScheduler()} instead - */ - public MergeScheduler getMergeScheduler() { - ensureOpen(); - return mergeScheduler; - } - - /**

Determines the largest segment (measured by - * document count) that may be merged with other segments. - * Small values (e.g., less than 10,000) are best for - * interactive indexing, as this limits the length of - * pauses while indexing to a few seconds. Larger values - * are best for batched indexing and speedier - * searches.

- * - *

The default value is {@link Integer#MAX_VALUE}.

- * - *

Note that this method is a convenience method: it - * just calls mergePolicy.setMaxMergeDocs as long as - * mergePolicy is an instance of {@link LogMergePolicy}. - * Otherwise an IllegalArgumentException is thrown.

- * - *

The default merge policy ({@link - * LogByteSizeMergePolicy}) also allows you to set this - * limit by net size (in MB) of the segment, using {@link - * LogByteSizeMergePolicy#setMaxMergeMB}.

- * @deprecated use {@link LogMergePolicy#setMaxMergeDocs(int)} directly. - */ - public void setMaxMergeDocs(int maxMergeDocs) { - getLogMergePolicy().setMaxMergeDocs(maxMergeDocs); - } - - /** - *

Returns the largest segment (measured by document - * count) that may be merged with other segments.

- * - *

Note that this method is a convenience method: it - * just calls mergePolicy.getMaxMergeDocs as long as - * mergePolicy is an instance of {@link LogMergePolicy}. - * Otherwise an IllegalArgumentException is thrown.

- * - * @see #setMaxMergeDocs - * @deprecated use {@link LogMergePolicy#getMaxMergeDocs()} directly. - */ - public int getMaxMergeDocs() { - return getLogMergePolicy().getMaxMergeDocs(); - } - - /** - * The maximum number of terms that will be indexed for a single field in a - * document. This limits the amount of memory required for indexing, so that - * collections with very large files will not crash the indexing process by - * running out of memory. This setting refers to the number of running terms, - * not to the number of different terms.

- * Note: this silently truncates large documents, excluding from the - * index all terms that occur further in the document. If you know your source - * documents are large, be sure to set this value high enough to accomodate - * the expected size. If you set it to Integer.MAX_VALUE, then the only limit - * is your memory, but you should anticipate an OutOfMemoryError.

- * By default, no more than {@link #DEFAULT_MAX_FIELD_LENGTH} terms - * will be indexed for a field. - * @deprecated use {@link IndexWriterConfig#setMaxFieldLength(int)} instead - */ - public void setMaxFieldLength(int maxFieldLength) { - ensureOpen(); - this.maxFieldLength = maxFieldLength; - docWriter.setMaxFieldLength(maxFieldLength); - if (infoStream != null) - message("setMaxFieldLength " + maxFieldLength); - // Required so config.getMaxFieldLength returns the right value. But this - // will go away together with the method in 4.0. - config.setMaxFieldLength(maxFieldLength); - } - - /** - * Returns the maximum number of terms that will be - * indexed for a single field in a document. - * @see #setMaxFieldLength - * @deprecated use {@link IndexWriterConfig#getMaxFieldLength()} instead - */ - public int getMaxFieldLength() { - ensureOpen(); - return maxFieldLength; - } - /** Determines the minimal number of documents required * before the buffered in-memory documents are flushed as * a new Segment. Large values generally gives faster @@ -1543,7 +1295,8 @@ public class IndexWriter implements Closeable { public void setInfoStream(PrintStream infoStream) { ensureOpen(); setMessageID(infoStream); - docWriter.setInfoStream(infoStream); + // nocommit + //docWriter.setInfoStream(infoStream); deleter.setInfoStream(infoStream); if (infoStream != null) messageState(); @@ -1571,48 +1324,6 @@ public class IndexWriter implements Closeable { return infoStream != null; } - /** - * Sets the maximum time to wait for a write lock (in milliseconds) for this instance of IndexWriter. @see - * @see #setDefaultWriteLockTimeout to change the default value for all instances of IndexWriter. - * @deprecated use {@link IndexWriterConfig#setWriteLockTimeout(long)} instead - */ - public void setWriteLockTimeout(long writeLockTimeout) { - ensureOpen(); - this.writeLockTimeout = writeLockTimeout; - // Required so config.getWriteLockTimeout returns the right value. But this - // will go away together with the method in 4.0. - config.setWriteLockTimeout(writeLockTimeout); - } - - /** - * Returns allowed timeout when acquiring the write lock. - * @see #setWriteLockTimeout - * @deprecated use {@link IndexWriterConfig#getWriteLockTimeout()} - */ - public long getWriteLockTimeout() { - ensureOpen(); - return writeLockTimeout; - } - - /** - * Sets the default (for any instance of IndexWriter) maximum time to wait for a write lock (in - * milliseconds). - * @deprecated use {@link IndexWriterConfig#setDefaultWriteLockTimeout(long)} instead - */ - public static void setDefaultWriteLockTimeout(long writeLockTimeout) { - IndexWriterConfig.setDefaultWriteLockTimeout(writeLockTimeout); - } - - /** - * Returns default write lock timeout for newly - * instantiated IndexWriters. - * @see #setDefaultWriteLockTimeout - * @deprecated use {@link IndexWriterConfig#getDefaultWriteLockTimeout()} instead - */ - public static long getDefaultWriteLockTimeout() { - return IndexWriterConfig.getDefaultWriteLockTimeout(); - } - /** * Commits all changes to an index and closes all * associated files. Note that this may be a costly @@ -1774,8 +1485,9 @@ public class IndexWriter implements Closeable { closing = false; notifyAll(); if (!closed) { - if (docWriter != null) + if (docWriter != null) { docWriter.resumeAllThreads(); + } if (infoStream != null) message("hit exception while closing"); } @@ -1783,76 +1495,6 @@ public class IndexWriter implements Closeable { } } - /** Tells the docWriter to close its currently open shared - * doc stores (stored fields & vectors files). - * Return value specifices whether new doc store files are compound or not. - */ - private synchronized boolean flushDocStores() throws IOException { - - boolean useCompoundDocStore = false; - - String docStoreSegment; - - boolean success = false; - try { - docStoreSegment = docWriter.closeDocStore(); - success = true; - } finally { - if (!success && infoStream != null) { - message("hit exception closing doc store segment"); - } - } - - useCompoundDocStore = mergePolicy.useCompoundDocStore(segmentInfos); - - if (useCompoundDocStore && docStoreSegment != null && docWriter.closedFiles().size() != 0) { - // Now build compound doc store file - - if (infoStream != null) { - message("create compound file " + IndexFileNames.segmentFileName(docStoreSegment, "", IndexFileNames.COMPOUND_FILE_STORE_EXTENSION)); - } - - success = false; - - final int numSegments = segmentInfos.size(); - final String compoundFileName = IndexFileNames.segmentFileName(docStoreSegment, "", IndexFileNames.COMPOUND_FILE_STORE_EXTENSION); - - try { - CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName); - for (final String file : docWriter.closedFiles() ) { - cfsWriter.addFile(file); - } - - // Perform the merge - cfsWriter.close(); - success = true; - - } finally { - if (!success) { - if (infoStream != null) - message("hit exception building compound file doc store for segment " + docStoreSegment); - deleter.deleteFile(compoundFileName); - docWriter.abort(); - } - } - - for(int i=0;i - * Note that this effectively truncates large documents, excluding from the - * index terms that occur further in the document. If you know your source - * documents are large, be sure to set this value high enough to accommodate - * the expected size. If you set it to Integer.MAX_VALUE, then the only limit - * is your memory, but you should anticipate an OutOfMemoryError.

- * By default, no more than 10,000 terms will be indexed for a field. - * - * @see MaxFieldLength - */ - private int maxFieldLength; - /** * Adds a document to this index. If the document contains more than * {@link #setMaxFieldLength(int)} terms for a given field, the remainder are @@ -1972,8 +1598,8 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public void addDocument(Document doc) throws CorruptIndexException, IOException { - addDocument(doc, analyzer); + public long addDocument(Document doc) throws CorruptIndexException, IOException { + return addDocument(doc, analyzer); } /** @@ -1993,36 +1619,36 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { + public long addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); - boolean doFlush = false; boolean success = false; try { try { - doFlush = docWriter.addDocument(doc, analyzer); + long sequenceID = docWriter.addDocument(doc, analyzer); success = true; + return sequenceID; } finally { if (!success) { - - if (infoStream != null) + if (infoStream != null) { message("hit exception adding document"); - + } synchronized (this) { // If docWriter has some aborted files that were // never incref'd, then we clean them up here if (docWriter != null) { final Collection files = docWriter.abortedFiles(); - if (files != null) + if (files != null) { deleter.deleteNewFiles(files); + } } } } } - if (doFlush) - flush(true, false, false); } catch (OutOfMemoryError oom) { handleOOM(oom, "addDocument"); } + + return -1; } /** @@ -2036,15 +1662,14 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public void deleteDocuments(Term term) throws CorruptIndexException, IOException { + public long deleteDocuments(Term term) throws CorruptIndexException, IOException { ensureOpen(); try { - boolean doFlush = docWriter.bufferDeleteTerm(term); - if (doFlush) - flush(true, false, false); + return docWriter.bufferDeleteTerm(term); } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term)"); } + return -1; } /** @@ -2060,15 +1685,14 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public void deleteDocuments(Term... terms) throws CorruptIndexException, IOException { + public long deleteDocuments(Term... terms) throws CorruptIndexException, IOException { ensureOpen(); try { - boolean doFlush = docWriter.bufferDeleteTerms(terms); - if (doFlush) - flush(true, false, false); + return docWriter.bufferDeleteTerms(terms); } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term..)"); } + return -1; } /** @@ -2082,11 +1706,9 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public void deleteDocuments(Query query) throws CorruptIndexException, IOException { + public long deleteDocuments(Query query) throws CorruptIndexException, IOException { ensureOpen(); - boolean doFlush = docWriter.bufferDeleteQuery(query); - if (doFlush) - flush(true, false, false); + return docWriter.bufferDeleteQuery(query); } /** @@ -2102,11 +1724,9 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public void deleteDocuments(Query... queries) throws CorruptIndexException, IOException { + public long deleteDocuments(Query... queries) throws CorruptIndexException, IOException { ensureOpen(); - boolean doFlush = docWriter.bufferDeleteQueries(queries); - if (doFlush) - flush(true, false, false); + return docWriter.bufferDeleteQueries(queries); } /** @@ -2149,35 +1769,37 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public void updateDocument(Term term, Document doc, Analyzer analyzer) + public long updateDocument(Term term, Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); try { - boolean doFlush = false; boolean success = false; try { - doFlush = docWriter.updateDocument(term, doc, analyzer); + long sequenceID = docWriter.updateDocument(term, doc, analyzer); success = true; + return sequenceID; } finally { if (!success) { - if (infoStream != null) + if (infoStream != null) { message("hit exception updating document"); + } synchronized (this) { // If docWriter has some aborted files that were // never incref'd, then we clean them up here final Collection files = docWriter.abortedFiles(); - if (files != null) + if (files != null) { deleter.deleteNewFiles(files); + } } } } - if (doFlush) - flush(true, false, false); } catch (OutOfMemoryError oom) { handleOOM(oom, "updateDocument"); } + + return -1; } // for test purpose @@ -2697,7 +2319,8 @@ public class IndexWriter implements Closeable { // Remove any buffered docs docWriter.abort(); - docWriter.setFlushedDocCount(0); + // nocommit + //docWriter.setFlushedDocCount(0); // Remove all segments segmentInfos.clear(); @@ -2790,7 +2413,8 @@ public class IndexWriter implements Closeable { * the index files referenced exist (correctly) in the * index directory. */ - private synchronized void checkpoint() throws IOException { + // nocommit - private + synchronized void checkpoint() throws IOException { changeCount++; deleter.checkpoint(segmentInfos, false); } @@ -2925,7 +2549,8 @@ public class IndexWriter implements Closeable { ensureOpen(); segmentInfos.addAll(infos); // Notify DocumentsWriter that the flushed count just increased - docWriter.updateFlushedDocCount(docCount); + // nocommit + //docWriter.updateFlushedDocCount(docCount); checkpoint(); } @@ -2977,11 +2602,12 @@ public class IndexWriter implements Closeable { checkpoint(); // Notify DocumentsWriter that the flushed count just increased - docWriter.updateFlushedDocCount(docCount); + // nocommit + //docWriter.updateFlushedDocCount(docCount); } // Now create the compound file if needed - if (mergePolicy instanceof LogMergePolicy && getUseCompoundFile()) { + if (mergePolicy instanceof LogMergePolicy && getLogMergePolicy().getUseCompoundFile()) { List files = null; @@ -3211,183 +2837,17 @@ public class IndexWriter implements Closeable { // synchronized, ie, merges should be allowed to commit // even while a flush is happening private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { - try { - try { - return doFlushInternal(flushDocStores, flushDeletes); - } finally { - docWriter.balanceRAM(); - } - } finally { - docWriter.clearFlushPending(); - } - } - - // TODO: this method should not have to be entirely - // synchronized, ie, merges should be allowed to commit - // even while a flush is happening - private synchronized final boolean doFlushInternal(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { - - if (hitOOM) { - throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush"); - } - - ensureOpen(false); - - assert testPoint("startDoFlush"); - - doBeforeFlush(); - - flushCount++; - - // If we are flushing because too many deletes - // accumulated, then we should apply the deletes to free - // RAM: - flushDeletes |= docWriter.doApplyDeletes(); - - // Make sure no threads are actively adding a document. - // Returns true if docWriter is currently aborting, in - // which case we skip flushing this segment - if (infoStream != null) { - message("flush: now pause all indexing threads"); - } - if (docWriter.pauseAllThreads()) { - docWriter.resumeAllThreads(); - return false; - } - - try { - - SegmentInfo newSegment = null; - - final int numDocs = docWriter.getNumDocsInRAM(); - - // Always flush docs if there are any - boolean flushDocs = numDocs > 0; - - String docStoreSegment = docWriter.getDocStoreSegment(); - - assert docStoreSegment != null || numDocs == 0: "dss=" + docStoreSegment + " numDocs=" + numDocs; - - if (docStoreSegment == null) - flushDocStores = false; - - int docStoreOffset = docWriter.getDocStoreOffset(); - - boolean docStoreIsCompoundFile = false; - - if (infoStream != null) { - message(" flush: segment=" + docWriter.getSegment() + - " docStoreSegment=" + docWriter.getDocStoreSegment() + - " docStoreOffset=" + docStoreOffset + - " flushDocs=" + flushDocs + - " flushDeletes=" + flushDeletes + - " flushDocStores=" + flushDocStores + - " numDocs=" + numDocs + - " numBufDelTerms=" + docWriter.getNumBufferedDeleteTerms()); - message(" index before flush " + segString()); - } - - // Check if the doc stores must be separately flushed - // because other segments, besides the one we are about - // to flush, reference it - if (flushDocStores && (!flushDocs || !docWriter.getSegment().equals(docWriter.getDocStoreSegment()))) { - // We must separately flush the doc store - if (infoStream != null) - message(" flush shared docStore segment " + docStoreSegment); - - docStoreIsCompoundFile = flushDocStores(); - flushDocStores = false; - } - - String segment = docWriter.getSegment(); - - // If we are flushing docs, segment must not be null: - assert segment != null || !flushDocs; - - if (flushDocs) { - - boolean success = false; - final int flushedDocCount; - - try { - flushedDocCount = docWriter.flush(flushDocStores); - success = true; - } finally { - if (!success) { - if (infoStream != null) - message("hit exception flushing segment " + segment); - deleter.refresh(segment); - } - } - - if (0 == docStoreOffset && flushDocStores) { - // This means we are flushing private doc stores - // with this segment, so it will not be shared - // with other segments - assert docStoreSegment != null; - assert docStoreSegment.equals(segment); - docStoreOffset = -1; - docStoreIsCompoundFile = false; - docStoreSegment = null; - } - - // Create new SegmentInfo, but do not add to our - // segmentInfos until deletes are flushed - // successfully. - newSegment = new SegmentInfo(segment, - flushedDocCount, - directory, false, docStoreOffset, - docStoreSegment, docStoreIsCompoundFile, - docWriter.hasProx(), - docWriter.getCodec()); - - setDiagnostics(newSegment, "flush"); - } - - docWriter.pushDeletes(); - - if (flushDocs) { - segmentInfos.add(newSegment); - checkpoint(); - } - - if (flushDocs && mergePolicy.useCompoundFile(segmentInfos, newSegment)) { - // Now build compound file - boolean success = false; - try { - docWriter.createCompoundFile(segment); - success = true; - } finally { - if (!success) { - if (infoStream != null) - message("hit exception creating compound file for newly flushed segment " + segment); - deleter.deleteFile(IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION)); - } - } - - newSegment.setUseCompoundFile(true); - checkpoint(); - } - - if (flushDeletes) { - applyDeletes(); - } - - if (flushDocs) - checkpoint(); - - doAfterFlush(); - - return flushDocs; - - } catch (OutOfMemoryError oom) { - handleOOM(oom, "doFlush"); - // never hit - return false; - } finally { - docWriter.clearFlushPending(); - docWriter.resumeAllThreads(); - } + return docWriter.flushAllThreads(flushDocStores, flushDeletes); + // nocommit +// try { +// try { +// return doFlushInternal(flushDocStores, flushDeletes); +// } finally { +// docWriter.balanceRAM(); +// } +// } finally { +// docWriter.clearFlushPending(); +// } } /** Expert: Return the total size of all index files currently cached in memory. @@ -3535,7 +2995,8 @@ public class IndexWriter implements Closeable { final int start = ensureContiguousMerge(merge); commitMergedDeletes(merge, mergedReader); - docWriter.remapDeletes(segmentInfos, merger.getDocMaps(), merger.getDelCounts(), merge, mergedDocCount); + // nocommit + //docWriter.remapDeletes(segmentInfos, merger.getDocMaps(), merger.getDelCounts(), merge, mergedDocCount); setMergeDocStoreIsCompoundFile(merge); merge.info.setHasProx(merger.hasProx()); @@ -3749,7 +3210,8 @@ public class IndexWriter implements Closeable { boolean mergeDocStores = false; boolean doFlushDocStore = false; - final String currentDocStoreSegment = docWriter.getDocStoreSegment(); + // nocommit + //final String currentDocStoreSegment = docWriter.getDocStoreSegment(); // Test each segment to be merged: check if we need to // flush/merge doc stores @@ -3793,9 +3255,10 @@ public class IndexWriter implements Closeable { // If the segment is referencing the current "live" // doc store outputs then we must merge - if (si.getDocStoreOffset() != -1 && currentDocStoreSegment != null && si.getDocStoreSegment().equals(currentDocStoreSegment)) { - doFlushDocStore = true; - } + // nocommit +// if (si.getDocStoreOffset() != -1 && currentDocStoreSegment != null && si.getDocStoreSegment().equals(currentDocStoreSegment)) { +// doFlushDocStore = true; +// } } final int docStoreOffset; @@ -3854,12 +3317,13 @@ public class IndexWriter implements Closeable { // CFS: mergingSegments.add(merge.info); } - - private void setDiagnostics(SegmentInfo info, String source) { + + // nocommit - private + static void setDiagnostics(SegmentInfo info, String source) { setDiagnostics(info, source, null); } - - private void setDiagnostics(SegmentInfo info, String source, Map details) { + + private static void setDiagnostics(SegmentInfo info, String source, Map details) { Map diagnostics = new HashMap(); diagnostics.put("source", source); diagnostics.put("lucene.version", Constants.LUCENE_VERSION); @@ -4003,11 +3467,12 @@ public class IndexWriter implements Closeable { // readers will attempt to open an IndexInput // on files that have still-open IndexOutputs // against them: - if (dss.contains(docWriter.getDocStoreSegment())) { - if (infoStream != null) - message("now flush at mergeMiddle"); - doFlush(true, false); - } + // nocommit +// if (dss.contains(docWriter.getDocStoreSegment())) { +// if (infoStream != null) +// message("now flush at mergeMiddle"); +// doFlush(true, false); +// } } for(int i=0;i