mirror of https://github.com/apache/lucene.git
Merge branch 'master' of github.com:apache/lucene-solr
This commit is contained in:
commit
a8099d6367
|
@ -182,6 +182,10 @@ API Changes
|
|||
is now public, making it easier for applications to decode facet
|
||||
ordinals into their corresponding labels (Ankur)
|
||||
|
||||
* LUCENE-9515: IndexingChain now accepts individual primitives rather than a
|
||||
DocumentsWriterPerThread instance in order to create a new DocConsumer.
|
||||
(Simon Willnauer)
|
||||
|
||||
New Features
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.lucene.index;
|
|||
import org.apache.lucene.store.DataOutput;
|
||||
import org.apache.lucene.util.ByteBlockPool;
|
||||
|
||||
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -28,6 +29,10 @@ import org.apache.lucene.util.ByteBlockPool;
|
|||
*/
|
||||
final class ByteSliceWriter extends DataOutput {
|
||||
|
||||
/* Initial chunks size of the shared byte[] blocks used to
|
||||
store postings data */
|
||||
private final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
|
||||
|
||||
private byte[] slice;
|
||||
private int upto;
|
||||
private final ByteBlockPool pool;
|
||||
|
@ -80,6 +85,6 @@ final class ByteSliceWriter extends DataOutput {
|
|||
}
|
||||
|
||||
public int getAddress() {
|
||||
return upto + (offset0 & DocumentsWriterPerThread.BYTE_BLOCK_NOT_MASK);
|
||||
return upto + (offset0 & BYTE_BLOCK_NOT_MASK);
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.TokenStream;
|
||||
|
@ -40,20 +41,24 @@ import org.apache.lucene.search.DocIdSetIterator;
|
|||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.SortField;
|
||||
import org.apache.lucene.search.similarities.Similarity;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.ByteBlockPool;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefHash.MaxBytesLengthExceededException;
|
||||
import org.apache.lucene.util.Counter;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.IntBlockPool;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
||||
/** Default general purpose indexing chain, which handles
|
||||
* indexing all types of fields. */
|
||||
final class DefaultIndexingChain extends DocConsumer {
|
||||
|
||||
|
||||
final Counter bytesUsed = Counter.newCounter();
|
||||
final DocumentsWriterPerThread docWriter;
|
||||
final FieldInfos.Builder fieldInfos;
|
||||
|
||||
// Writes postings and term vectors:
|
||||
|
@ -73,21 +78,38 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
// Holds fields seen in each document
|
||||
private PerField[] fields = new PerField[1];
|
||||
private final InfoStream infoStream;
|
||||
private final ByteBlockPool.Allocator byteBlockAllocator;
|
||||
private final LiveIndexWriterConfig indexWriterConfig;
|
||||
private final int indexCreatedVersionMajor;
|
||||
private final Consumer<Throwable> abortingExceptionConsumer;
|
||||
private boolean hasHitAbortingException;
|
||||
|
||||
DefaultIndexingChain(DocumentsWriterPerThread docWriter) {
|
||||
this.docWriter = docWriter;
|
||||
this.fieldInfos = docWriter.getFieldInfosBuilder();
|
||||
this.infoStream = docWriter.getIndexWriterConfig().getInfoStream();
|
||||
DefaultIndexingChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory, FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
|
||||
Consumer<Throwable> abortingExceptionConsumer) {
|
||||
this.indexCreatedVersionMajor = indexCreatedVersionMajor;
|
||||
byteBlockAllocator = new ByteBlockPool.DirectTrackingAllocator(bytesUsed);
|
||||
IntBlockPool.Allocator intBlockAllocator = new IntBlockAllocator(bytesUsed);
|
||||
this.indexWriterConfig = indexWriterConfig;
|
||||
assert segmentInfo.getIndexSort() == indexWriterConfig.getIndexSort();
|
||||
this.fieldInfos = fieldInfos;
|
||||
this.infoStream = indexWriterConfig.getInfoStream();
|
||||
this.abortingExceptionConsumer = abortingExceptionConsumer;
|
||||
|
||||
final TermsHash termVectorsWriter;
|
||||
if (docWriter.getSegmentInfo().getIndexSort() == null) {
|
||||
storedFieldsConsumer = new StoredFieldsConsumer(docWriter.codec, docWriter.directory, docWriter.getSegmentInfo());
|
||||
termVectorsWriter = new TermVectorsConsumer(docWriter);
|
||||
if (segmentInfo.getIndexSort() == null) {
|
||||
storedFieldsConsumer = new StoredFieldsConsumer(indexWriterConfig.getCodec(), directory, segmentInfo);
|
||||
termVectorsWriter = new TermVectorsConsumer(intBlockAllocator, byteBlockAllocator, directory, segmentInfo, indexWriterConfig.getCodec());
|
||||
} else {
|
||||
storedFieldsConsumer = new SortingStoredFieldsConsumer(docWriter.codec, docWriter.directory, docWriter.getSegmentInfo());
|
||||
termVectorsWriter = new SortingTermVectorsConsumer(docWriter);
|
||||
storedFieldsConsumer = new SortingStoredFieldsConsumer(indexWriterConfig.getCodec(), directory, segmentInfo);
|
||||
termVectorsWriter = new SortingTermVectorsConsumer(intBlockAllocator, byteBlockAllocator, directory, segmentInfo, indexWriterConfig.getCodec());
|
||||
}
|
||||
termsHash = new FreqProxTermsWriter(docWriter, bytesUsed, termVectorsWriter);
|
||||
termsHash = new FreqProxTermsWriter(intBlockAllocator, byteBlockAllocator, bytesUsed, termVectorsWriter);
|
||||
}
|
||||
|
||||
private void onAbortingException(Throwable th) {
|
||||
assert th != null;
|
||||
this.hasHitAbortingException = true;
|
||||
abortingExceptionConsumer.accept(th);
|
||||
}
|
||||
|
||||
private LeafReader getDocValuesLeafReader() {
|
||||
|
@ -247,7 +269,7 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
// FreqProxTermsWriter does this with
|
||||
// FieldInfo.storePayload.
|
||||
t0 = System.nanoTime();
|
||||
docWriter.codec.fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT);
|
||||
indexWriterConfig.getCodec().fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT);
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write fieldInfos");
|
||||
}
|
||||
|
@ -423,7 +445,7 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
try {
|
||||
storedFieldsConsumer.startDocument(docID);
|
||||
} catch (Throwable th) {
|
||||
docWriter.onAbortingException(th);
|
||||
onAbortingException(th);
|
||||
throw th;
|
||||
}
|
||||
}
|
||||
|
@ -434,7 +456,7 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
try {
|
||||
storedFieldsConsumer.finishDocument();
|
||||
} catch (Throwable th) {
|
||||
docWriter.onAbortingException(th);
|
||||
onAbortingException(th);
|
||||
throw th;
|
||||
}
|
||||
}
|
||||
|
@ -463,7 +485,7 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
fieldCount = processField(docID, field, fieldGen, fieldCount);
|
||||
}
|
||||
} finally {
|
||||
if (docWriter.hasHitAbortingException() == false) {
|
||||
if (hasHitAbortingException == false) {
|
||||
// Finish each indexed field name seen in the document:
|
||||
for (int i=0;i<fieldCount;i++) {
|
||||
fields[i].finish(docID);
|
||||
|
@ -477,7 +499,7 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
} catch (Throwable th) {
|
||||
// Must abort, on the possibility that on-disk term
|
||||
// vectors are now corrupt:
|
||||
docWriter.onAbortingException(th);
|
||||
abortingExceptionConsumer.accept(th);
|
||||
throw th;
|
||||
}
|
||||
}
|
||||
|
@ -519,7 +541,7 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
try {
|
||||
storedFieldsConsumer.writeField(fp.fieldInfo, field);
|
||||
} catch (Throwable th) {
|
||||
docWriter.onAbortingException(th);
|
||||
onAbortingException(th);
|
||||
throw th;
|
||||
}
|
||||
}
|
||||
|
@ -580,7 +602,7 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
fp.fieldInfo.setPointDimensions(pointDimensionCount, pointIndexDimensionCount, dimensionNumBytes);
|
||||
|
||||
if (fp.pointValuesWriter == null) {
|
||||
fp.pointValuesWriter = new PointValuesWriter(docWriter.byteBlockAllocator, bytesUsed, fp.fieldInfo);
|
||||
fp.pointValuesWriter = new PointValuesWriter(byteBlockAllocator, bytesUsed, fp.fieldInfo);
|
||||
}
|
||||
fp.pointValuesWriter.addPackedValue(docID, field.binaryValue());
|
||||
}
|
||||
|
@ -647,8 +669,8 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
// This is the first time we are seeing this field indexed with doc values, so we
|
||||
// now record the DV type so that any future attempt to (illegally) change
|
||||
// the DV type of this field, will throw an IllegalArgExc:
|
||||
if (docWriter.getSegmentInfo().getIndexSort() != null) {
|
||||
final Sort indexSort = docWriter.getSegmentInfo().getIndexSort();
|
||||
if (indexWriterConfig.getIndexSort() != null) {
|
||||
final Sort indexSort = indexWriterConfig.getIndexSort();
|
||||
validateIndexSortDVType(indexSort, fp.fieldInfo.name, dvType);
|
||||
}
|
||||
fieldInfos.globalFieldNumbers.setDocValuesType(fp.fieldInfo.number, fp.fieldInfo.name, dvType);
|
||||
|
@ -735,8 +757,7 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
attributes.forEach((k, v) -> fi.putAttribute(k, v));
|
||||
}
|
||||
|
||||
LiveIndexWriterConfig indexWriterConfig = docWriter.getIndexWriterConfig();
|
||||
fp = new PerField(docWriter.getIndexCreatedVersionMajor(), fi, invert,
|
||||
fp = new PerField(indexCreatedVersionMajor, fi, invert,
|
||||
indexWriterConfig.getSimilarity(), indexWriterConfig.getInfoStream(), indexWriterConfig.getAnalyzer());
|
||||
fp.next = fieldHash[hashPos];
|
||||
fieldHash[hashPos] = fp;
|
||||
|
@ -943,14 +964,14 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
byte[] prefix = new byte[30];
|
||||
BytesRef bigTerm = invertState.termAttribute.getBytesRef();
|
||||
System.arraycopy(bigTerm.bytes, bigTerm.offset, prefix, 0, 30);
|
||||
String msg = "Document contains at least one immense term in field=\"" + fieldInfo.name + "\" (whose UTF8 encoding is longer than the max length " + DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '" + Arrays.toString(prefix) + "...', original message: " + e.getMessage();
|
||||
String msg = "Document contains at least one immense term in field=\"" + fieldInfo.name + "\" (whose UTF8 encoding is longer than the max length " + IndexWriter.MAX_TERM_LENGTH + "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '" + Arrays.toString(prefix) + "...', original message: " + e.getMessage();
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
infoStream.message("IW", "ERROR: " + msg);
|
||||
}
|
||||
// Document will be deleted above:
|
||||
throw new IllegalArgumentException(msg, e);
|
||||
} catch (Throwable th) {
|
||||
docWriter.onAbortingException(th);
|
||||
onAbortingException(th);
|
||||
throw th;
|
||||
}
|
||||
}
|
||||
|
@ -992,4 +1013,28 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static class IntBlockAllocator extends IntBlockPool.Allocator {
|
||||
private final Counter bytesUsed;
|
||||
|
||||
IntBlockAllocator(Counter bytesUsed) {
|
||||
super(IntBlockPool.INT_BLOCK_SIZE);
|
||||
this.bytesUsed = bytesUsed;
|
||||
}
|
||||
|
||||
/* Allocate another int[] from the shared pool */
|
||||
@Override
|
||||
public int[] getIntBlock() {
|
||||
int[] b = new int[IntBlockPool.INT_BLOCK_SIZE];
|
||||
bytesUsed.addAndGet(IntBlockPool.INT_BLOCK_SIZE * Integer.BYTES);
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recycleIntBlocks(int[][] blocks, int offset, int length) {
|
||||
bytesUsed.addAndGet(-(length * (IntBlockPool.INT_BLOCK_SIZE * Integer.BYTES)));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.lucene.index;
|
|||
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
|
@ -440,7 +439,7 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
|
|||
flushDeletes.set(true);
|
||||
}
|
||||
|
||||
DocumentsWriterPerThread obtainAndLock() throws IOException {
|
||||
DocumentsWriterPerThread obtainAndLock() {
|
||||
while (closed == false) {
|
||||
final DocumentsWriterPerThread perThread = perThreadPool.getAndLock();
|
||||
if (perThread.deleteQueue == documentsWriter.deleteQueue) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Objects;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
|
||||
|
@ -38,55 +39,44 @@ import org.apache.lucene.store.TrackingDirectoryWrapper;
|
|||
import org.apache.lucene.util.Accountable;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.ByteBlockPool.Allocator;
|
||||
import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
|
||||
import org.apache.lucene.util.Counter;
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.IntBlockPool;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.apache.lucene.util.Version;
|
||||
|
||||
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
|
||||
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
|
||||
|
||||
final class DocumentsWriterPerThread implements Accountable {
|
||||
|
||||
LiveIndexWriterConfig getIndexWriterConfig() {
|
||||
return indexWriterConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* The IndexingChain must define the {@link #getChain(DocumentsWriterPerThread)} method
|
||||
* The IndexingChain must define the {@link #getChain(int, SegmentInfo, Directory, FieldInfos.Builder, LiveIndexWriterConfig, Consumer)} method
|
||||
* which returns the DocConsumer that the DocumentsWriter calls to process the
|
||||
* documents.
|
||||
*/
|
||||
abstract static class IndexingChain {
|
||||
abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) throws IOException;
|
||||
abstract DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory,
|
||||
FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
|
||||
Consumer<Throwable> abortingExceptionConsumer);
|
||||
}
|
||||
|
||||
private Throwable abortingException;
|
||||
|
||||
final void onAbortingException(Throwable throwable) {
|
||||
private void onAbortingException(Throwable throwable) {
|
||||
assert throwable != null : "aborting exception must not be null";
|
||||
assert abortingException == null: "aborting exception has already been set";
|
||||
abortingException = throwable;
|
||||
}
|
||||
|
||||
final boolean hasHitAbortingException() {
|
||||
return abortingException != null;
|
||||
}
|
||||
|
||||
final boolean isAborted() {
|
||||
return aborted;
|
||||
}
|
||||
|
||||
|
||||
static final IndexingChain defaultIndexingChain = new IndexingChain() {
|
||||
|
||||
@Override
|
||||
DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) {
|
||||
return new DefaultIndexingChain(documentsWriterPerThread);
|
||||
DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory,
|
||||
FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
|
||||
Consumer<Throwable> abortingExceptionConsumer) {
|
||||
return new DefaultIndexingChain(indexCreatedVersionMajor, segmentInfo, directory, fieldInfos, indexWriterConfig, abortingExceptionConsumer);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -135,7 +125,6 @@ final class DocumentsWriterPerThread implements Accountable {
|
|||
final Codec codec;
|
||||
final TrackingDirectoryWrapper directory;
|
||||
private final DocConsumer consumer;
|
||||
private final Counter bytesUsed;
|
||||
|
||||
// Updates for our still-in-RAM (to be flushed next) segment
|
||||
private final BufferedUpdates pendingUpdates;
|
||||
|
@ -151,29 +140,23 @@ final class DocumentsWriterPerThread implements Accountable {
|
|||
final DocumentsWriterDeleteQueue deleteQueue;
|
||||
private final DeleteSlice deleteSlice;
|
||||
private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT);
|
||||
final Allocator byteBlockAllocator;
|
||||
final IntBlockPool.Allocator intBlockAllocator;
|
||||
private final AtomicLong pendingNumDocs;
|
||||
private final LiveIndexWriterConfig indexWriterConfig;
|
||||
private final boolean enableTestPoints;
|
||||
private final int indexVersionCreated;
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
private int[] deleteDocIDs = new int[0];
|
||||
private int numDeletedDocIds = 0;
|
||||
|
||||
|
||||
DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, DocumentsWriterDeleteQueue deleteQueue,
|
||||
FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
|
||||
DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory,
|
||||
LiveIndexWriterConfig indexWriterConfig, DocumentsWriterDeleteQueue deleteQueue,
|
||||
FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) {
|
||||
this.directory = new TrackingDirectoryWrapper(directory);
|
||||
this.fieldInfos = fieldInfos;
|
||||
this.indexWriterConfig = indexWriterConfig;
|
||||
this.infoStream = indexWriterConfig.getInfoStream();
|
||||
this.codec = indexWriterConfig.getCodec();
|
||||
this.pendingNumDocs = pendingNumDocs;
|
||||
bytesUsed = Counter.newCounter();
|
||||
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
|
||||
pendingUpdates = new BufferedUpdates(segmentName);
|
||||
intBlockAllocator = new IntBlockAllocator(bytesUsed);
|
||||
this.deleteQueue = Objects.requireNonNull(deleteQueue);
|
||||
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
|
||||
deleteSlice = deleteQueue.newSlice();
|
||||
|
@ -184,18 +167,7 @@ final class DocumentsWriterPerThread implements Accountable {
|
|||
infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue);
|
||||
}
|
||||
this.enableTestPoints = enableTestPoints;
|
||||
this.indexVersionCreated = indexVersionCreated;
|
||||
// this should be the last call in the ctor
|
||||
// it really sucks that we need to pull this within the ctor and pass this ref to the chain!
|
||||
consumer = indexWriterConfig.getIndexingChain().getChain(this);
|
||||
}
|
||||
|
||||
FieldInfos.Builder getFieldInfosBuilder() {
|
||||
return fieldInfos;
|
||||
}
|
||||
|
||||
int getIndexCreatedVersionMajor() {
|
||||
return indexVersionCreated;
|
||||
consumer = indexWriterConfig.getIndexingChain().getChain(indexVersionCreated, segmentInfo, this.directory, fieldInfos, indexWriterConfig, this::onAbortingException);
|
||||
}
|
||||
|
||||
final void testPoint(String message) {
|
||||
|
@ -218,7 +190,7 @@ final class DocumentsWriterPerThread implements Accountable {
|
|||
long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
|
||||
try {
|
||||
testPoint("DocumentsWriterPerThread addDocuments start");
|
||||
assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
|
||||
assert abortingException == null: "DWPT has hit aborting exception but is still indexing";
|
||||
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
|
||||
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + numDocsInRAM + " seg=" + segmentInfo.name);
|
||||
}
|
||||
|
@ -290,12 +262,10 @@ final class DocumentsWriterPerThread implements Accountable {
|
|||
private void deleteLastDocs(int docCount) {
|
||||
int from = numDocsInRAM-docCount;
|
||||
int to = numDocsInRAM;
|
||||
int size = deleteDocIDs.length;
|
||||
deleteDocIDs = ArrayUtil.grow(deleteDocIDs, numDeletedDocIds + (to-from));
|
||||
for (int docId = from; docId < to; docId++) {
|
||||
deleteDocIDs[numDeletedDocIds++] = docId;
|
||||
}
|
||||
bytesUsed.addAndGet((deleteDocIDs.length - size) * Integer.BYTES);
|
||||
// NOTE: we do not trigger flush here. This is
|
||||
// potentially a RAM leak, if you have an app that tries
|
||||
// to add docs but every single doc always hits a
|
||||
|
@ -354,9 +324,7 @@ final class DocumentsWriterPerThread implements Accountable {
|
|||
flushState.liveDocs.clear(deleteDocIDs[i]);
|
||||
}
|
||||
flushState.delCountOnFlush = numDeletedDocIds;
|
||||
bytesUsed.addAndGet(-(deleteDocIDs.length * Integer.BYTES));
|
||||
deleteDocIDs = null;
|
||||
|
||||
deleteDocIDs = new int[0];
|
||||
}
|
||||
|
||||
if (aborted) {
|
||||
|
@ -374,8 +342,8 @@ final class DocumentsWriterPerThread implements Accountable {
|
|||
final Sorter.DocMap sortMap;
|
||||
try {
|
||||
DocIdSetIterator softDeletedDocs;
|
||||
if (getIndexWriterConfig().getSoftDeletesField() != null) {
|
||||
softDeletedDocs = consumer.getHasDocValues(getIndexWriterConfig().getSoftDeletesField());
|
||||
if (indexWriterConfig.getSoftDeletesField() != null) {
|
||||
softDeletedDocs = consumer.getHasDocValues(indexWriterConfig.getSoftDeletesField());
|
||||
} else {
|
||||
softDeletedDocs = null;
|
||||
}
|
||||
|
@ -439,7 +407,7 @@ final class DocumentsWriterPerThread implements Accountable {
|
|||
}
|
||||
|
||||
private void maybeAbort(String location, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
|
||||
if (hasHitAbortingException() && aborted == false) {
|
||||
if (abortingException != null && aborted == false) {
|
||||
// if we are already aborted don't do anything here
|
||||
try {
|
||||
abort();
|
||||
|
@ -483,7 +451,7 @@ final class DocumentsWriterPerThread implements Accountable {
|
|||
boolean success = false;
|
||||
try {
|
||||
|
||||
if (getIndexWriterConfig().getUseCompoundFile()) {
|
||||
if (indexWriterConfig.getUseCompoundFile()) {
|
||||
Set<String> originalFiles = newSegment.info.files();
|
||||
// TODO: like addIndexes, we are relying on createCompoundFile to successfully cleanup...
|
||||
IndexWriter.createCompoundFile(infoStream, new TrackingDirectoryWrapper(directory), newSegment.info, context, flushNotifications::deleteUnusedFiles);
|
||||
|
@ -550,7 +518,7 @@ final class DocumentsWriterPerThread implements Accountable {
|
|||
|
||||
@Override
|
||||
public long ramBytesUsed() {
|
||||
return bytesUsed.get() + pendingUpdates.ramBytesUsed() + consumer.ramBytesUsed();
|
||||
return (deleteDocIDs.length * Integer.BYTES)+ pendingUpdates.ramBytesUsed() + consumer.ramBytesUsed();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -558,38 +526,6 @@ final class DocumentsWriterPerThread implements Accountable {
|
|||
return List.of(pendingUpdates, consumer);
|
||||
}
|
||||
|
||||
/* Initial chunks size of the shared byte[] blocks used to
|
||||
store postings data */
|
||||
final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
|
||||
|
||||
/* if you increase this, you must fix field cache impl for
|
||||
* getTerms/getTermsIndex requires <= 32768 */
|
||||
final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
|
||||
|
||||
|
||||
private static class IntBlockAllocator extends IntBlockPool.Allocator {
|
||||
private final Counter bytesUsed;
|
||||
|
||||
public IntBlockAllocator(Counter bytesUsed) {
|
||||
super(IntBlockPool.INT_BLOCK_SIZE);
|
||||
this.bytesUsed = bytesUsed;
|
||||
}
|
||||
|
||||
/* Allocate another int[] from the shared pool */
|
||||
@Override
|
||||
public int[] getIntBlock() {
|
||||
int[] b = new int[IntBlockPool.INT_BLOCK_SIZE];
|
||||
bytesUsed.addAndGet(IntBlockPool.INT_BLOCK_SIZE * Integer.BYTES);
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recycleIntBlocks(int[][] blocks, int offset, int length) {
|
||||
bytesUsed.addAndGet(-(length * (IntBlockPool.INT_BLOCK_SIZE * Integer.BYTES)));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DocumentsWriterPerThread [pendingDeletes=" + pendingUpdates
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -27,9 +26,9 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.IOSupplier;
|
||||
import org.apache.lucene.util.ThreadInterruptedException;
|
||||
|
||||
/**
|
||||
|
@ -49,12 +48,12 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
|
|||
|
||||
private final Set<DocumentsWriterPerThread> dwpts = Collections.newSetFromMap(new IdentityHashMap<>());
|
||||
private final Deque<DocumentsWriterPerThread> freeList = new ArrayDeque<>();
|
||||
private final IOSupplier<DocumentsWriterPerThread> dwptFactory;
|
||||
private final Supplier<DocumentsWriterPerThread> dwptFactory;
|
||||
private int takenWriterPermits = 0;
|
||||
private boolean closed;
|
||||
|
||||
|
||||
DocumentsWriterPerThreadPool(IOSupplier<DocumentsWriterPerThread> dwptFactory) {
|
||||
DocumentsWriterPerThreadPool(Supplier<DocumentsWriterPerThread> dwptFactory) {
|
||||
this.dwptFactory = dwptFactory;
|
||||
}
|
||||
|
||||
|
@ -86,7 +85,7 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
|
|||
*
|
||||
* @return a new {@link DocumentsWriterPerThread}
|
||||
*/
|
||||
private synchronized DocumentsWriterPerThread newWriter() throws IOException {
|
||||
private synchronized DocumentsWriterPerThread newWriter() {
|
||||
assert takenWriterPermits >= 0;
|
||||
while (takenWriterPermits > 0) {
|
||||
// we can't create new DWPTs while not all permits are available
|
||||
|
@ -110,7 +109,7 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
|
|||
// of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing
|
||||
|
||||
/** This method is used by DocumentsWriter/FlushControl to obtain a DWPT to do an indexing operation (add/updateDocument). */
|
||||
DocumentsWriterPerThread getAndLock() throws IOException {
|
||||
DocumentsWriterPerThread getAndLock() {
|
||||
synchronized (this) {
|
||||
ensureOpen();
|
||||
// Important that we are LIFO here! This way if number of concurrent indexing threads was once high,
|
||||
|
|
|
@ -30,18 +30,20 @@ import org.apache.lucene.store.ByteBuffersDataInput;
|
|||
import org.apache.lucene.store.ByteBuffersDataOutput;
|
||||
import org.apache.lucene.store.DataOutput;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.ByteBlockPool;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.apache.lucene.util.Counter;
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.IntBlockPool;
|
||||
import org.apache.lucene.util.TimSorter;
|
||||
import org.apache.lucene.util.automaton.CompiledAutomaton;
|
||||
|
||||
final class FreqProxTermsWriter extends TermsHash {
|
||||
|
||||
FreqProxTermsWriter(DocumentsWriterPerThread docWriter, Counter bytesUsed, TermsHash termVectors) {
|
||||
super(docWriter, bytesUsed, termVectors);
|
||||
FreqProxTermsWriter(final IntBlockPool.Allocator intBlockAllocator, final ByteBlockPool.Allocator byteBlockAllocator, Counter bytesUsed, TermsHash termVectors) {
|
||||
super(intBlockAllocator, byteBlockAllocator, bytesUsed, termVectors);
|
||||
}
|
||||
|
||||
private void applyDeletes(SegmentWriteState state, Fields fields) throws IOException {
|
||||
|
|
|
@ -82,6 +82,7 @@ import org.apache.lucene.util.UnicodeUtil;
|
|||
import org.apache.lucene.util.Version;
|
||||
|
||||
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
|
||||
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
|
||||
|
||||
/**
|
||||
An <code>IndexWriter</code> creates and maintains an index.
|
||||
|
@ -272,7 +273,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
* and a message is printed to infoStream, if set (see {@link
|
||||
* IndexWriterConfig#setInfoStream(InfoStream)}).
|
||||
*/
|
||||
public final static int MAX_TERM_LENGTH = DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8;
|
||||
public final static int MAX_TERM_LENGTH = BYTE_BLOCK_SIZE-2;
|
||||
|
||||
/**
|
||||
* Maximum length string for a stored field.
|
||||
|
|
|
@ -21,20 +21,24 @@ import java.io.IOException;
|
|||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.codecs.NormsProducer;
|
||||
import org.apache.lucene.codecs.TermVectorsReader;
|
||||
import org.apache.lucene.codecs.TermVectorsWriter;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FlushInfo;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.util.ByteBlockPool;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.IntBlockPool;
|
||||
|
||||
final class SortingTermVectorsConsumer extends TermVectorsConsumer {
|
||||
TrackingTmpOutputDirectoryWrapper tmpDirectory;
|
||||
|
||||
SortingTermVectorsConsumer(DocumentsWriterPerThread docWriter) {
|
||||
super(docWriter);
|
||||
SortingTermVectorsConsumer(final IntBlockPool.Allocator intBlockAllocator, final ByteBlockPool.Allocator byteBlockAllocator, Directory directory, SegmentInfo info, Codec codec) {
|
||||
super(intBlockAllocator, byteBlockAllocator, directory, info, codec);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -48,10 +52,10 @@ final class SortingTermVectorsConsumer extends TermVectorsConsumer {
|
|||
}
|
||||
return;
|
||||
}
|
||||
TermVectorsReader reader = docWriter.codec.termVectorsFormat()
|
||||
TermVectorsReader reader = codec.termVectorsFormat()
|
||||
.vectorsReader(tmpDirectory, state.segmentInfo, state.fieldInfos, IOContext.DEFAULT);
|
||||
TermVectorsReader mergeReader = reader.getMergeInstance();
|
||||
TermVectorsWriter writer = docWriter.codec.termVectorsFormat()
|
||||
TermVectorsWriter writer = codec.termVectorsFormat()
|
||||
.vectorsWriter(state.directory, state.segmentInfo, IOContext.DEFAULT);
|
||||
try {
|
||||
reader.checkIntegrity();
|
||||
|
@ -71,9 +75,9 @@ final class SortingTermVectorsConsumer extends TermVectorsConsumer {
|
|||
@Override
|
||||
void initTermVectorsWriter() throws IOException {
|
||||
if (writer == null) {
|
||||
IOContext context = new IOContext(new FlushInfo(docWriter.getNumDocsInRAM(), docWriter.ramBytesUsed()));
|
||||
tmpDirectory = new TrackingTmpOutputDirectoryWrapper(docWriter.directory);
|
||||
writer = docWriter.codec.termVectorsFormat().vectorsWriter(tmpDirectory, docWriter.getSegmentInfo(), context);
|
||||
IOContext context = new IOContext(new FlushInfo(lastDocID, bytesUsed.get()));
|
||||
tmpDirectory = new TrackingTmpOutputDirectoryWrapper(directory);
|
||||
writer = codec.termVectorsFormat().vectorsWriter(tmpDirectory, info, context);
|
||||
lastDocID = 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,23 +21,29 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.codecs.NormsProducer;
|
||||
import org.apache.lucene.codecs.TermVectorsWriter;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FlushInfo;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.ByteBlockPool;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.Counter;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.IntBlockPool;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
||||
class TermVectorsConsumer extends TermsHash {
|
||||
protected final Directory directory;
|
||||
protected final SegmentInfo info;
|
||||
protected final Codec codec;
|
||||
TermVectorsWriter writer;
|
||||
|
||||
/** Scratch term used by TermVectorsConsumerPerField.finishDocument. */
|
||||
final BytesRef flushTerm = new BytesRef();
|
||||
|
||||
final DocumentsWriterPerThread docWriter;
|
||||
|
||||
/** Used by TermVectorsConsumerPerField when serializing
|
||||
* the term vectors. */
|
||||
|
@ -49,9 +55,11 @@ class TermVectorsConsumer extends TermsHash {
|
|||
int lastDocID;
|
||||
private TermVectorsConsumerPerField[] perFields = new TermVectorsConsumerPerField[1];
|
||||
|
||||
TermVectorsConsumer(DocumentsWriterPerThread docWriter) {
|
||||
super(docWriter, Counter.newCounter(), null);
|
||||
this.docWriter = docWriter;
|
||||
TermVectorsConsumer(final IntBlockPool.Allocator intBlockAllocator, final ByteBlockPool.Allocator byteBlockAllocator, Directory directory, SegmentInfo info, Codec codec) {
|
||||
super(intBlockAllocator, byteBlockAllocator, Counter.newCounter(), null);
|
||||
this.directory = directory;
|
||||
this.info = info;
|
||||
this.codec = codec;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -85,8 +93,8 @@ class TermVectorsConsumer extends TermsHash {
|
|||
|
||||
void initTermVectorsWriter() throws IOException {
|
||||
if (writer == null) {
|
||||
IOContext context = new IOContext(new FlushInfo(docWriter.getNumDocsInRAM(), docWriter.ramBytesUsed()));
|
||||
writer = docWriter.codec.termVectorsFormat().vectorsWriter(docWriter.directory, docWriter.getSegmentInfo(), context);
|
||||
IOContext context = new IOContext(new FlushInfo(lastDocID, bytesUsed.get()));
|
||||
writer = codec.termVectorsFormat().vectorsWriter(directory, info, context);
|
||||
lastDocID = 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,11 +41,11 @@ abstract class TermsHash {
|
|||
ByteBlockPool termBytePool;
|
||||
final Counter bytesUsed;
|
||||
|
||||
TermsHash(final DocumentsWriterPerThread docWriter, Counter bytesUsed, TermsHash nextTermsHash) {
|
||||
TermsHash(final IntBlockPool.Allocator intBlockAllocator, final ByteBlockPool.Allocator byteBlockAllocator, Counter bytesUsed, TermsHash nextTermsHash) {
|
||||
this.nextTermsHash = nextTermsHash;
|
||||
this.bytesUsed = bytesUsed;
|
||||
intPool = new IntBlockPool(docWriter.intBlockAllocator);
|
||||
bytePool = new ByteBlockPool(docWriter.byteBlockAllocator);
|
||||
intPool = new IntBlockPool(intBlockAllocator);
|
||||
bytePool = new ByteBlockPool(byteBlockAllocator);
|
||||
|
||||
if (nextTermsHash != null) {
|
||||
// We are primary
|
||||
|
|
|
@ -74,8 +74,6 @@ public class TestDocumentsWriterPerThreadPool extends LuceneTestCase {
|
|||
fail();
|
||||
} catch (AlreadyClosedException e) {
|
||||
// fine
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
|
|
|
@ -1533,7 +1533,7 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||
Directory dir = newDirectory();
|
||||
RandomIndexWriter w = new RandomIndexWriter(random(), dir, new StringSplitAnalyzer());
|
||||
|
||||
char[] chars = new char[DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8];
|
||||
char[] chars = new char[IndexWriter.MAX_TERM_LENGTH];
|
||||
Arrays.fill(chars, 'x');
|
||||
Document hugeDoc = new Document();
|
||||
final String bigTerm = new String(chars);
|
||||
|
@ -3717,8 +3717,6 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||
states.add(state::unlock);
|
||||
state.deleteQueue.getNextSequenceNumber();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
} finally {
|
||||
IOUtils.closeWhileHandlingException(states);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.lang.reflect.Method;
|
|||
import java.lang.reflect.Modifier;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
|
@ -43,12 +44,13 @@ public class TestIndexWriterConfig extends LuceneTestCase {
|
|||
}
|
||||
|
||||
private static final class MyIndexingChain extends IndexingChain {
|
||||
// Does not implement anything - used only for type checking on IndexWriterConfig.
|
||||
|
||||
@Override
|
||||
DocConsumer getChain(DocumentsWriterPerThread documentsWriter) {
|
||||
DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory,
|
||||
FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
|
||||
Consumer<Throwable> abortingExceptionConsumer) {
|
||||
return null;
|
||||
}
|
||||
// Does not implement anything - used only for type checking on IndexWriterConfig.
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Objects;
|
|||
|
||||
import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
|
||||
|
||||
@LuceneTestCase.AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/LUCENE-9521")
|
||||
public class TestPassageSelector extends LuceneTestCase {
|
||||
@Test
|
||||
public void checkEmptyExtra() {
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
rootProject.name = "lucene-solr"
|
||||
|
||||
includeBuild("dev-tools/missing-doclet")
|
||||
|
||||
include "lucene:analysis:common"
|
||||
|
|
|
@ -30,6 +30,7 @@ public class ConfigSet {
|
|||
private final String name;
|
||||
|
||||
private final SolrConfig solrconfig;
|
||||
private IndexSchema schema;
|
||||
|
||||
private final SchemaSupplier schemaSupplier;
|
||||
|
||||
|
@ -44,6 +45,7 @@ public class ConfigSet {
|
|||
this.name = name;
|
||||
this.solrconfig = solrConfig;
|
||||
this.schemaSupplier = indexSchemaSupplier;
|
||||
schema = schemaSupplier.get(true);
|
||||
this.properties = properties;
|
||||
this.trusted = trusted;
|
||||
}
|
||||
|
@ -61,10 +63,11 @@ public class ConfigSet {
|
|||
* @param forceFetch get a fresh value and not cached value
|
||||
*/
|
||||
public IndexSchema getIndexSchema(boolean forceFetch) {
|
||||
return schemaSupplier.get(forceFetch);
|
||||
if(forceFetch) schema = schemaSupplier.get(true);
|
||||
return schema;
|
||||
}
|
||||
public IndexSchema getIndexSchema() {
|
||||
return schemaSupplier.get(false);
|
||||
return schema;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes"})
|
||||
|
|
Loading…
Reference in New Issue