diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index e6c76018c3a..c3185b5fb2a 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -182,6 +182,10 @@ API Changes is now public, making it easier for applications to decode facet ordinals into their corresponding labels (Ankur) +* LUCENE-9515: IndexingChain now accepts individual primitives rather than a + DocumentsWriterPerThread instance in order to create a new DocConsumer. + (Simon Willnauer) + New Features --------------------- diff --git a/lucene/core/src/java/org/apache/lucene/index/ByteSliceWriter.java b/lucene/core/src/java/org/apache/lucene/index/ByteSliceWriter.java index 75650049aa2..812e8cc6495 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ByteSliceWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/ByteSliceWriter.java @@ -19,6 +19,7 @@ package org.apache.lucene.index; import org.apache.lucene.store.DataOutput; import org.apache.lucene.util.ByteBlockPool; +import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK; /** @@ -28,6 +29,10 @@ import org.apache.lucene.util.ByteBlockPool; */ final class ByteSliceWriter extends DataOutput { + /* Initial chunks size of the shared byte[] blocks used to + store postings data */ + private final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK; + private byte[] slice; private int upto; private final ByteBlockPool pool; @@ -80,6 +85,6 @@ final class ByteSliceWriter extends DataOutput { } public int getAddress() { - return upto + (offset0 & DocumentsWriterPerThread.BYTE_BLOCK_NOT_MASK); + return upto + (offset0 & BYTE_BLOCK_NOT_MASK); } } \ No newline at end of file diff --git a/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java b/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java index 15e7c8b1a06..b45a8e5f050 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java +++ b/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; @@ -40,20 +41,24 @@ import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.similarities.Similarity; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.ByteBlockPool; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefHash.MaxBytesLengthExceededException; import org.apache.lucene.util.Counter; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.IntBlockPool; import org.apache.lucene.util.RamUsageEstimator; /** Default general purpose indexing chain, which handles * indexing all types of fields. */ final class DefaultIndexingChain extends DocConsumer { + + final Counter bytesUsed = Counter.newCounter(); - final DocumentsWriterPerThread docWriter; final FieldInfos.Builder fieldInfos; // Writes postings and term vectors: @@ -73,21 +78,38 @@ final class DefaultIndexingChain extends DocConsumer { // Holds fields seen in each document private PerField[] fields = new PerField[1]; private final InfoStream infoStream; + private final ByteBlockPool.Allocator byteBlockAllocator; + private final LiveIndexWriterConfig indexWriterConfig; + private final int indexCreatedVersionMajor; + private final Consumer abortingExceptionConsumer; + private boolean hasHitAbortingException; - DefaultIndexingChain(DocumentsWriterPerThread docWriter) { - this.docWriter = docWriter; - this.fieldInfos = docWriter.getFieldInfosBuilder(); - this.infoStream = docWriter.getIndexWriterConfig().getInfoStream(); + DefaultIndexingChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory, FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig, + Consumer abortingExceptionConsumer) { + this.indexCreatedVersionMajor = indexCreatedVersionMajor; + byteBlockAllocator = new ByteBlockPool.DirectTrackingAllocator(bytesUsed); + IntBlockPool.Allocator intBlockAllocator = new IntBlockAllocator(bytesUsed); + this.indexWriterConfig = indexWriterConfig; + assert segmentInfo.getIndexSort() == indexWriterConfig.getIndexSort(); + this.fieldInfos = fieldInfos; + this.infoStream = indexWriterConfig.getInfoStream(); + this.abortingExceptionConsumer = abortingExceptionConsumer; final TermsHash termVectorsWriter; - if (docWriter.getSegmentInfo().getIndexSort() == null) { - storedFieldsConsumer = new StoredFieldsConsumer(docWriter.codec, docWriter.directory, docWriter.getSegmentInfo()); - termVectorsWriter = new TermVectorsConsumer(docWriter); + if (segmentInfo.getIndexSort() == null) { + storedFieldsConsumer = new StoredFieldsConsumer(indexWriterConfig.getCodec(), directory, segmentInfo); + termVectorsWriter = new TermVectorsConsumer(intBlockAllocator, byteBlockAllocator, directory, segmentInfo, indexWriterConfig.getCodec()); } else { - storedFieldsConsumer = new SortingStoredFieldsConsumer(docWriter.codec, docWriter.directory, docWriter.getSegmentInfo()); - termVectorsWriter = new SortingTermVectorsConsumer(docWriter); + storedFieldsConsumer = new SortingStoredFieldsConsumer(indexWriterConfig.getCodec(), directory, segmentInfo); + termVectorsWriter = new SortingTermVectorsConsumer(intBlockAllocator, byteBlockAllocator, directory, segmentInfo, indexWriterConfig.getCodec()); } - termsHash = new FreqProxTermsWriter(docWriter, bytesUsed, termVectorsWriter); + termsHash = new FreqProxTermsWriter(intBlockAllocator, byteBlockAllocator, bytesUsed, termVectorsWriter); + } + + private void onAbortingException(Throwable th) { + assert th != null; + this.hasHitAbortingException = true; + abortingExceptionConsumer.accept(th); } private LeafReader getDocValuesLeafReader() { @@ -247,7 +269,7 @@ final class DefaultIndexingChain extends DocConsumer { // FreqProxTermsWriter does this with // FieldInfo.storePayload. t0 = System.nanoTime(); - docWriter.codec.fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT); + indexWriterConfig.getCodec().fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT); if (infoStream.isEnabled("IW")) { infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write fieldInfos"); } @@ -423,7 +445,7 @@ final class DefaultIndexingChain extends DocConsumer { try { storedFieldsConsumer.startDocument(docID); } catch (Throwable th) { - docWriter.onAbortingException(th); + onAbortingException(th); throw th; } } @@ -434,7 +456,7 @@ final class DefaultIndexingChain extends DocConsumer { try { storedFieldsConsumer.finishDocument(); } catch (Throwable th) { - docWriter.onAbortingException(th); + onAbortingException(th); throw th; } } @@ -463,7 +485,7 @@ final class DefaultIndexingChain extends DocConsumer { fieldCount = processField(docID, field, fieldGen, fieldCount); } } finally { - if (docWriter.hasHitAbortingException() == false) { + if (hasHitAbortingException == false) { // Finish each indexed field name seen in the document: for (int i=0;i fi.putAttribute(k, v)); } - LiveIndexWriterConfig indexWriterConfig = docWriter.getIndexWriterConfig(); - fp = new PerField(docWriter.getIndexCreatedVersionMajor(), fi, invert, + fp = new PerField(indexCreatedVersionMajor, fi, invert, indexWriterConfig.getSimilarity(), indexWriterConfig.getInfoStream(), indexWriterConfig.getAnalyzer()); fp.next = fieldHash[hashPos]; fieldHash[hashPos] = fp; @@ -943,14 +964,14 @@ final class DefaultIndexingChain extends DocConsumer { byte[] prefix = new byte[30]; BytesRef bigTerm = invertState.termAttribute.getBytesRef(); System.arraycopy(bigTerm.bytes, bigTerm.offset, prefix, 0, 30); - String msg = "Document contains at least one immense term in field=\"" + fieldInfo.name + "\" (whose UTF8 encoding is longer than the max length " + DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '" + Arrays.toString(prefix) + "...', original message: " + e.getMessage(); + String msg = "Document contains at least one immense term in field=\"" + fieldInfo.name + "\" (whose UTF8 encoding is longer than the max length " + IndexWriter.MAX_TERM_LENGTH + "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '" + Arrays.toString(prefix) + "...', original message: " + e.getMessage(); if (infoStream.isEnabled("IW")) { infoStream.message("IW", "ERROR: " + msg); } // Document will be deleted above: throw new IllegalArgumentException(msg, e); } catch (Throwable th) { - docWriter.onAbortingException(th); + onAbortingException(th); throw th; } } @@ -992,4 +1013,28 @@ final class DefaultIndexingChain extends DocConsumer { } return null; } + + private static class IntBlockAllocator extends IntBlockPool.Allocator { + private final Counter bytesUsed; + + IntBlockAllocator(Counter bytesUsed) { + super(IntBlockPool.INT_BLOCK_SIZE); + this.bytesUsed = bytesUsed; + } + + /* Allocate another int[] from the shared pool */ + @Override + public int[] getIntBlock() { + int[] b = new int[IntBlockPool.INT_BLOCK_SIZE]; + bytesUsed.addAndGet(IntBlockPool.INT_BLOCK_SIZE * Integer.BYTES); + return b; + } + + @Override + public void recycleIntBlocks(int[][] blocks, int offset, int length) { + bytesUsed.addAndGet(-(length * (IntBlockPool.INT_BLOCK_SIZE * Integer.BYTES))); + } + + } + } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index 79e4add1649..c35cae4166d 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -18,7 +18,6 @@ package org.apache.lucene.index; import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -440,7 +439,7 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable { flushDeletes.set(true); } - DocumentsWriterPerThread obtainAndLock() throws IOException { + DocumentsWriterPerThread obtainAndLock() { while (closed == false) { final DocumentsWriterPerThread perThread = perThreadPool.getAndLock(); if (perThread.deleteQueue == documentsWriter.deleteQueue) { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 264ef55ff13..c94e8564a63 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice; @@ -38,55 +39,44 @@ import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.Bits; -import org.apache.lucene.util.ByteBlockPool.Allocator; -import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator; -import org.apache.lucene.util.Counter; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.InfoStream; -import org.apache.lucene.util.IntBlockPool; import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.Version; -import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK; -import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE; - final class DocumentsWriterPerThread implements Accountable { - LiveIndexWriterConfig getIndexWriterConfig() { - return indexWriterConfig; - } - /** - * The IndexingChain must define the {@link #getChain(DocumentsWriterPerThread)} method + * The IndexingChain must define the {@link #getChain(int, SegmentInfo, Directory, FieldInfos.Builder, LiveIndexWriterConfig, Consumer)} method * which returns the DocConsumer that the DocumentsWriter calls to process the * documents. */ abstract static class IndexingChain { - abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) throws IOException; + abstract DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory, + FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig, + Consumer abortingExceptionConsumer); } private Throwable abortingException; - final void onAbortingException(Throwable throwable) { + private void onAbortingException(Throwable throwable) { + assert throwable != null : "aborting exception must not be null"; assert abortingException == null: "aborting exception has already been set"; abortingException = throwable; } - final boolean hasHitAbortingException() { - return abortingException != null; - } - final boolean isAborted() { return aborted; } - static final IndexingChain defaultIndexingChain = new IndexingChain() { @Override - DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) { - return new DefaultIndexingChain(documentsWriterPerThread); + DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory, + FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig, + Consumer abortingExceptionConsumer) { + return new DefaultIndexingChain(indexCreatedVersionMajor, segmentInfo, directory, fieldInfos, indexWriterConfig, abortingExceptionConsumer); } }; @@ -135,8 +125,7 @@ final class DocumentsWriterPerThread implements Accountable { final Codec codec; final TrackingDirectoryWrapper directory; private final DocConsumer consumer; - private final Counter bytesUsed; - + // Updates for our still-in-RAM (to be flushed next) segment private final BufferedUpdates pendingUpdates; private final SegmentInfo segmentInfo; // Current segment we are working on @@ -151,29 +140,23 @@ final class DocumentsWriterPerThread implements Accountable { final DocumentsWriterDeleteQueue deleteQueue; private final DeleteSlice deleteSlice; private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT); - final Allocator byteBlockAllocator; - final IntBlockPool.Allocator intBlockAllocator; private final AtomicLong pendingNumDocs; private final LiveIndexWriterConfig indexWriterConfig; private final boolean enableTestPoints; - private final int indexVersionCreated; private final ReentrantLock lock = new ReentrantLock(); private int[] deleteDocIDs = new int[0]; private int numDeletedDocIds = 0; - - DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, DocumentsWriterDeleteQueue deleteQueue, - FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException { + DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, + LiveIndexWriterConfig indexWriterConfig, DocumentsWriterDeleteQueue deleteQueue, + FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) { this.directory = new TrackingDirectoryWrapper(directory); this.fieldInfos = fieldInfos; this.indexWriterConfig = indexWriterConfig; this.infoStream = indexWriterConfig.getInfoStream(); this.codec = indexWriterConfig.getCodec(); this.pendingNumDocs = pendingNumDocs; - bytesUsed = Counter.newCounter(); - byteBlockAllocator = new DirectTrackingAllocator(bytesUsed); pendingUpdates = new BufferedUpdates(segmentName); - intBlockAllocator = new IntBlockAllocator(bytesUsed); this.deleteQueue = Objects.requireNonNull(deleteQueue); assert numDocsInRAM == 0 : "num docs " + numDocsInRAM; deleteSlice = deleteQueue.newSlice(); @@ -184,20 +167,9 @@ final class DocumentsWriterPerThread implements Accountable { infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue); } this.enableTestPoints = enableTestPoints; - this.indexVersionCreated = indexVersionCreated; - // this should be the last call in the ctor - // it really sucks that we need to pull this within the ctor and pass this ref to the chain! - consumer = indexWriterConfig.getIndexingChain().getChain(this); + consumer = indexWriterConfig.getIndexingChain().getChain(indexVersionCreated, segmentInfo, this.directory, fieldInfos, indexWriterConfig, this::onAbortingException); } - FieldInfos.Builder getFieldInfosBuilder() { - return fieldInfos; - } - - int getIndexCreatedVersionMajor() { - return indexVersionCreated; - } - final void testPoint(String message) { if (enableTestPoints) { assert infoStream.isEnabled("TP"); // don't enable unless you need them. @@ -218,7 +190,7 @@ final class DocumentsWriterPerThread implements Accountable { long updateDocuments(Iterable> docs, DocumentsWriterDeleteQueue.Node deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException { try { testPoint("DocumentsWriterPerThread addDocuments start"); - assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing"; + assert abortingException == null: "DWPT has hit aborting exception but is still indexing"; if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + numDocsInRAM + " seg=" + segmentInfo.name); } @@ -290,12 +262,10 @@ final class DocumentsWriterPerThread implements Accountable { private void deleteLastDocs(int docCount) { int from = numDocsInRAM-docCount; int to = numDocsInRAM; - int size = deleteDocIDs.length; deleteDocIDs = ArrayUtil.grow(deleteDocIDs, numDeletedDocIds + (to-from)); for (int docId = from; docId < to; docId++) { deleteDocIDs[numDeletedDocIds++] = docId; } - bytesUsed.addAndGet((deleteDocIDs.length - size) * Integer.BYTES); // NOTE: we do not trigger flush here. This is // potentially a RAM leak, if you have an app that tries // to add docs but every single doc always hits a @@ -354,9 +324,7 @@ final class DocumentsWriterPerThread implements Accountable { flushState.liveDocs.clear(deleteDocIDs[i]); } flushState.delCountOnFlush = numDeletedDocIds; - bytesUsed.addAndGet(-(deleteDocIDs.length * Integer.BYTES)); - deleteDocIDs = null; - + deleteDocIDs = new int[0]; } if (aborted) { @@ -374,8 +342,8 @@ final class DocumentsWriterPerThread implements Accountable { final Sorter.DocMap sortMap; try { DocIdSetIterator softDeletedDocs; - if (getIndexWriterConfig().getSoftDeletesField() != null) { - softDeletedDocs = consumer.getHasDocValues(getIndexWriterConfig().getSoftDeletesField()); + if (indexWriterConfig.getSoftDeletesField() != null) { + softDeletedDocs = consumer.getHasDocValues(indexWriterConfig.getSoftDeletesField()); } else { softDeletedDocs = null; } @@ -439,7 +407,7 @@ final class DocumentsWriterPerThread implements Accountable { } private void maybeAbort(String location, DocumentsWriter.FlushNotifications flushNotifications) throws IOException { - if (hasHitAbortingException() && aborted == false) { + if (abortingException != null && aborted == false) { // if we are already aborted don't do anything here try { abort(); @@ -483,7 +451,7 @@ final class DocumentsWriterPerThread implements Accountable { boolean success = false; try { - if (getIndexWriterConfig().getUseCompoundFile()) { + if (indexWriterConfig.getUseCompoundFile()) { Set originalFiles = newSegment.info.files(); // TODO: like addIndexes, we are relying on createCompoundFile to successfully cleanup... IndexWriter.createCompoundFile(infoStream, new TrackingDirectoryWrapper(directory), newSegment.info, context, flushNotifications::deleteUnusedFiles); @@ -550,7 +518,7 @@ final class DocumentsWriterPerThread implements Accountable { @Override public long ramBytesUsed() { - return bytesUsed.get() + pendingUpdates.ramBytesUsed() + consumer.ramBytesUsed(); + return (deleteDocIDs.length * Integer.BYTES)+ pendingUpdates.ramBytesUsed() + consumer.ramBytesUsed(); } @Override @@ -558,38 +526,6 @@ final class DocumentsWriterPerThread implements Accountable { return List.of(pendingUpdates, consumer); } - /* Initial chunks size of the shared byte[] blocks used to - store postings data */ - final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK; - - /* if you increase this, you must fix field cache impl for - * getTerms/getTermsIndex requires <= 32768 */ - final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2; - - - private static class IntBlockAllocator extends IntBlockPool.Allocator { - private final Counter bytesUsed; - - public IntBlockAllocator(Counter bytesUsed) { - super(IntBlockPool.INT_BLOCK_SIZE); - this.bytesUsed = bytesUsed; - } - - /* Allocate another int[] from the shared pool */ - @Override - public int[] getIntBlock() { - int[] b = new int[IntBlockPool.INT_BLOCK_SIZE]; - bytesUsed.addAndGet(IntBlockPool.INT_BLOCK_SIZE * Integer.BYTES); - return b; - } - - @Override - public void recycleIntBlocks(int[][] blocks, int offset, int length) { - bytesUsed.addAndGet(-(length * (IntBlockPool.INT_BLOCK_SIZE * Integer.BYTES))); - } - - } - @Override public String toString() { return "DocumentsWriterPerThread [pendingDeletes=" + pendingUpdates diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index 5db3ec4294c..29a7e0bdf7e 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -17,7 +17,6 @@ package org.apache.lucene.index; import java.io.Closeable; -import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -27,9 +26,9 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.function.Predicate; +import java.util.function.Supplier; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.util.IOSupplier; import org.apache.lucene.util.ThreadInterruptedException; /** @@ -49,12 +48,12 @@ final class DocumentsWriterPerThreadPool implements Iterable dwpts = Collections.newSetFromMap(new IdentityHashMap<>()); private final Deque freeList = new ArrayDeque<>(); - private final IOSupplier dwptFactory; + private final Supplier dwptFactory; private int takenWriterPermits = 0; private boolean closed; - DocumentsWriterPerThreadPool(IOSupplier dwptFactory) { + DocumentsWriterPerThreadPool(Supplier dwptFactory) { this.dwptFactory = dwptFactory; } @@ -86,7 +85,7 @@ final class DocumentsWriterPerThreadPool implements Iterable= 0; while (takenWriterPermits > 0) { // we can't create new DWPTs while not all permits are available @@ -110,7 +109,7 @@ final class DocumentsWriterPerThreadPool implements IterableIndexWriter creates and maintains an index. @@ -272,7 +273,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, * and a message is printed to infoStream, if set (see {@link * IndexWriterConfig#setInfoStream(InfoStream)}). */ - public final static int MAX_TERM_LENGTH = DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8; + public final static int MAX_TERM_LENGTH = BYTE_BLOCK_SIZE-2; /** * Maximum length string for a stored field. diff --git a/lucene/core/src/java/org/apache/lucene/index/SortingTermVectorsConsumer.java b/lucene/core/src/java/org/apache/lucene/index/SortingTermVectorsConsumer.java index 5c0fc876119..c005edef198 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SortingTermVectorsConsumer.java +++ b/lucene/core/src/java/org/apache/lucene/index/SortingTermVectorsConsumer.java @@ -21,20 +21,24 @@ import java.io.IOException; import java.util.Iterator; import java.util.Map; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.NormsProducer; import org.apache.lucene.codecs.TermVectorsReader; import org.apache.lucene.codecs.TermVectorsWriter; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.FlushInfo; import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.ByteBlockPool; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.IntBlockPool; final class SortingTermVectorsConsumer extends TermVectorsConsumer { TrackingTmpOutputDirectoryWrapper tmpDirectory; - SortingTermVectorsConsumer(DocumentsWriterPerThread docWriter) { - super(docWriter); + SortingTermVectorsConsumer(final IntBlockPool.Allocator intBlockAllocator, final ByteBlockPool.Allocator byteBlockAllocator, Directory directory, SegmentInfo info, Codec codec) { + super(intBlockAllocator, byteBlockAllocator, directory, info, codec); } @Override @@ -48,10 +52,10 @@ final class SortingTermVectorsConsumer extends TermVectorsConsumer { } return; } - TermVectorsReader reader = docWriter.codec.termVectorsFormat() + TermVectorsReader reader = codec.termVectorsFormat() .vectorsReader(tmpDirectory, state.segmentInfo, state.fieldInfos, IOContext.DEFAULT); TermVectorsReader mergeReader = reader.getMergeInstance(); - TermVectorsWriter writer = docWriter.codec.termVectorsFormat() + TermVectorsWriter writer = codec.termVectorsFormat() .vectorsWriter(state.directory, state.segmentInfo, IOContext.DEFAULT); try { reader.checkIntegrity(); @@ -71,9 +75,9 @@ final class SortingTermVectorsConsumer extends TermVectorsConsumer { @Override void initTermVectorsWriter() throws IOException { if (writer == null) { - IOContext context = new IOContext(new FlushInfo(docWriter.getNumDocsInRAM(), docWriter.ramBytesUsed())); - tmpDirectory = new TrackingTmpOutputDirectoryWrapper(docWriter.directory); - writer = docWriter.codec.termVectorsFormat().vectorsWriter(tmpDirectory, docWriter.getSegmentInfo(), context); + IOContext context = new IOContext(new FlushInfo(lastDocID, bytesUsed.get())); + tmpDirectory = new TrackingTmpOutputDirectoryWrapper(directory); + writer = codec.termVectorsFormat().vectorsWriter(tmpDirectory, info, context); lastDocID = 0; } } diff --git a/lucene/core/src/java/org/apache/lucene/index/TermVectorsConsumer.java b/lucene/core/src/java/org/apache/lucene/index/TermVectorsConsumer.java index 4f042d2901b..1f6b1973540 100644 --- a/lucene/core/src/java/org/apache/lucene/index/TermVectorsConsumer.java +++ b/lucene/core/src/java/org/apache/lucene/index/TermVectorsConsumer.java @@ -21,23 +21,29 @@ import java.io.IOException; import java.util.Arrays; import java.util.Map; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.NormsProducer; import org.apache.lucene.codecs.TermVectorsWriter; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.FlushInfo; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.ByteBlockPool; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Counter; import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.IntBlockPool; import org.apache.lucene.util.RamUsageEstimator; class TermVectorsConsumer extends TermsHash { + protected final Directory directory; + protected final SegmentInfo info; + protected final Codec codec; TermVectorsWriter writer; /** Scratch term used by TermVectorsConsumerPerField.finishDocument. */ final BytesRef flushTerm = new BytesRef(); - final DocumentsWriterPerThread docWriter; /** Used by TermVectorsConsumerPerField when serializing * the term vectors. */ @@ -49,9 +55,11 @@ class TermVectorsConsumer extends TermsHash { int lastDocID; private TermVectorsConsumerPerField[] perFields = new TermVectorsConsumerPerField[1]; - TermVectorsConsumer(DocumentsWriterPerThread docWriter) { - super(docWriter, Counter.newCounter(), null); - this.docWriter = docWriter; + TermVectorsConsumer(final IntBlockPool.Allocator intBlockAllocator, final ByteBlockPool.Allocator byteBlockAllocator, Directory directory, SegmentInfo info, Codec codec) { + super(intBlockAllocator, byteBlockAllocator, Counter.newCounter(), null); + this.directory = directory; + this.info = info; + this.codec = codec; } @Override @@ -85,8 +93,8 @@ class TermVectorsConsumer extends TermsHash { void initTermVectorsWriter() throws IOException { if (writer == null) { - IOContext context = new IOContext(new FlushInfo(docWriter.getNumDocsInRAM(), docWriter.ramBytesUsed())); - writer = docWriter.codec.termVectorsFormat().vectorsWriter(docWriter.directory, docWriter.getSegmentInfo(), context); + IOContext context = new IOContext(new FlushInfo(lastDocID, bytesUsed.get())); + writer = codec.termVectorsFormat().vectorsWriter(directory, info, context); lastDocID = 0; } } diff --git a/lucene/core/src/java/org/apache/lucene/index/TermsHash.java b/lucene/core/src/java/org/apache/lucene/index/TermsHash.java index d2e6b28dcb0..c2313e09296 100644 --- a/lucene/core/src/java/org/apache/lucene/index/TermsHash.java +++ b/lucene/core/src/java/org/apache/lucene/index/TermsHash.java @@ -41,11 +41,11 @@ abstract class TermsHash { ByteBlockPool termBytePool; final Counter bytesUsed; - TermsHash(final DocumentsWriterPerThread docWriter, Counter bytesUsed, TermsHash nextTermsHash) { + TermsHash(final IntBlockPool.Allocator intBlockAllocator, final ByteBlockPool.Allocator byteBlockAllocator, Counter bytesUsed, TermsHash nextTermsHash) { this.nextTermsHash = nextTermsHash; this.bytesUsed = bytesUsed; - intPool = new IntBlockPool(docWriter.intBlockAllocator); - bytePool = new ByteBlockPool(docWriter.byteBlockAllocator); + intPool = new IntBlockPool(intBlockAllocator); + bytePool = new ByteBlockPool(byteBlockAllocator); if (nextTermsHash != null) { // We are primary diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java index dc317508c2e..c84fc99198b 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java @@ -74,8 +74,6 @@ public class TestDocumentsWriterPerThreadPool extends LuceneTestCase { fail(); } catch (AlreadyClosedException e) { // fine - } catch (IOException e) { - throw new AssertionError(e); } }); t.start(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index 3ede19e1826..f428b94f9b7 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -1533,7 +1533,7 @@ public class TestIndexWriter extends LuceneTestCase { Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir, new StringSplitAnalyzer()); - char[] chars = new char[DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8]; + char[] chars = new char[IndexWriter.MAX_TERM_LENGTH]; Arrays.fill(chars, 'x'); Document hugeDoc = new Document(); final String bigTerm = new String(chars); @@ -3717,8 +3717,6 @@ public class TestIndexWriter extends LuceneTestCase { states.add(state::unlock); state.deleteQueue.getNextSequenceNumber(); } - } catch (IOException e) { - throw new AssertionError(e); } finally { IOUtils.closeWhileHandlingException(states); } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java index dffb1576c87..a1cef6cc0ec 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java @@ -22,6 +22,7 @@ import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.HashSet; import java.util.Set; +import java.util.function.Consumer; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.codecs.Codec; @@ -43,12 +44,13 @@ public class TestIndexWriterConfig extends LuceneTestCase { } private static final class MyIndexingChain extends IndexingChain { - // Does not implement anything - used only for type checking on IndexWriterConfig. - @Override - DocConsumer getChain(DocumentsWriterPerThread documentsWriter) { + DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory, + FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig, + Consumer abortingExceptionConsumer) { return null; } + // Does not implement anything - used only for type checking on IndexWriterConfig. } diff --git a/lucene/highlighter/src/test/org/apache/lucene/search/matchhighlight/TestPassageSelector.java b/lucene/highlighter/src/test/org/apache/lucene/search/matchhighlight/TestPassageSelector.java index c3ec7c834fb..153e98f6295 100644 --- a/lucene/highlighter/src/test/org/apache/lucene/search/matchhighlight/TestPassageSelector.java +++ b/lucene/highlighter/src/test/org/apache/lucene/search/matchhighlight/TestPassageSelector.java @@ -27,6 +27,7 @@ import java.util.Objects; import static com.carrotsearch.randomizedtesting.RandomizedTest.*; +@LuceneTestCase.AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/LUCENE-9521") public class TestPassageSelector extends LuceneTestCase { @Test public void checkEmptyExtra() { diff --git a/settings.gradle b/settings.gradle index 309cc5ac0c8..aeafd2b5a37 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,6 +15,8 @@ * limitations under the License. */ +rootProject.name = "lucene-solr" + includeBuild("dev-tools/missing-doclet") include "lucene:analysis:common" diff --git a/solr/core/src/java/org/apache/solr/core/ConfigSet.java b/solr/core/src/java/org/apache/solr/core/ConfigSet.java index 81124772801..7161bc1e0ab 100644 --- a/solr/core/src/java/org/apache/solr/core/ConfigSet.java +++ b/solr/core/src/java/org/apache/solr/core/ConfigSet.java @@ -30,6 +30,7 @@ public class ConfigSet { private final String name; private final SolrConfig solrconfig; + private IndexSchema schema; private final SchemaSupplier schemaSupplier; @@ -40,10 +41,11 @@ public class ConfigSet { @SuppressWarnings({"rawtypes"}) public ConfigSet(String name, SolrConfig solrConfig, SchemaSupplier indexSchemaSupplier, - NamedList properties, boolean trusted) { + NamedList properties, boolean trusted) { this.name = name; this.solrconfig = solrConfig; this.schemaSupplier = indexSchemaSupplier; + schema = schemaSupplier.get(true); this.properties = properties; this.trusted = trusted; } @@ -61,17 +63,18 @@ public class ConfigSet { * @param forceFetch get a fresh value and not cached value */ public IndexSchema getIndexSchema(boolean forceFetch) { - return schemaSupplier.get(forceFetch); + if(forceFetch) schema = schemaSupplier.get(true); + return schema; } public IndexSchema getIndexSchema() { - return schemaSupplier.get(false); + return schema; } @SuppressWarnings({"rawtypes"}) public NamedList getProperties() { return properties; } - + public boolean isTrusted() { return trusted; } @@ -82,7 +85,7 @@ public class ConfigSet { * So, we may not be able to update the core if we the schema classes are updated * */ interface SchemaSupplier { - IndexSchema get(boolean forceFetch); + IndexSchema get(boolean forceFetch); } }