LUCENE-9516: Remove DocConsumer and IndexingChain from Lucene (#1867)

This removes the ability to replace the IndexingChain / DocConsumer
in Lucenes IndexWriter. The interface is not sufficient to efficiently
replace the functionality with reasonable efforts. It also seems it's completely
unused at this point and hasn't been maintained in years.
This commit is contained in:
Simon Willnauer 2020-09-15 10:15:24 +02:00 committed by GitHub
parent 7d62cad1a8
commit f655d97b54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 20 additions and 109 deletions

View File

@ -61,6 +61,10 @@ API Changes
* LUCENE-9462: Fields without positions should still return MatchIterator. * LUCENE-9462: Fields without positions should still return MatchIterator.
(Alan Woodward, Dawid Weiss) (Alan Woodward, Dawid Weiss)
* LUCENE-9516: Removed the ability to replace the IndexingChain / DocConsumer
in Lucenes IndexWriter. The interface is not sufficient to efficiently
replace the functionality with reasonable efforts. (Simon Willnauer)
Improvements Improvements
* LUCENE-9463: Query match region retrieval component, passage scoring and formatting * LUCENE-9463: Query match region retrieval component, passage scoring and formatting

View File

@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Accountable;
abstract class DocConsumer implements Accountable {
abstract void processDocument(int docId, Iterable<? extends IndexableField> document) throws IOException;
abstract Sorter.DocMap flush(final SegmentWriteState state) throws IOException;
abstract void abort() throws IOException;
/**
* Returns a {@link DocIdSetIterator} for the given field or null if the field doesn't have
* doc values.
*/
abstract DocIdSetIterator getHasDocValues(String field);
}

View File

@ -27,7 +27,6 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice; import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
@ -47,17 +46,6 @@ import org.apache.lucene.util.Version;
final class DocumentsWriterPerThread implements Accountable { final class DocumentsWriterPerThread implements Accountable {
/**
* The IndexingChain must define the {@link #getChain(int, SegmentInfo, Directory, FieldInfos.Builder, LiveIndexWriterConfig, Consumer)} method
* which returns the DocConsumer that the DocumentsWriter calls to process the
* documents.
*/
abstract static class IndexingChain {
abstract DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory,
FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
Consumer<Throwable> abortingExceptionConsumer);
}
private Throwable abortingException; private Throwable abortingException;
private void onAbortingException(Throwable throwable) { private void onAbortingException(Throwable throwable) {
@ -70,16 +58,6 @@ final class DocumentsWriterPerThread implements Accountable {
return aborted; return aborted;
} }
static final IndexingChain defaultIndexingChain = new IndexingChain() {
@Override
DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory,
FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
Consumer<Throwable> abortingExceptionConsumer) {
return new DefaultIndexingChain(indexCreatedVersionMajor, segmentInfo, directory, fieldInfos, indexWriterConfig, abortingExceptionConsumer);
}
};
static final class FlushedSegment { static final class FlushedSegment {
final SegmentCommitInfo segmentInfo; final SegmentCommitInfo segmentInfo;
final FieldInfos fieldInfos; final FieldInfos fieldInfos;
@ -111,7 +89,7 @@ final class DocumentsWriterPerThread implements Accountable {
infoStream.message("DWPT", "now abort"); infoStream.message("DWPT", "now abort");
} }
try { try {
consumer.abort(); indexingChain.abort();
} finally { } finally {
pendingUpdates.clear(); pendingUpdates.clear();
} }
@ -124,7 +102,7 @@ final class DocumentsWriterPerThread implements Accountable {
private final static boolean INFO_VERBOSE = false; private final static boolean INFO_VERBOSE = false;
final Codec codec; final Codec codec;
final TrackingDirectoryWrapper directory; final TrackingDirectoryWrapper directory;
private final DocConsumer consumer; private final IndexingChain indexingChain;
// Updates for our still-in-RAM (to be flushed next) segment // Updates for our still-in-RAM (to be flushed next) segment
private final BufferedUpdates pendingUpdates; private final BufferedUpdates pendingUpdates;
@ -167,7 +145,7 @@ final class DocumentsWriterPerThread implements Accountable {
infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue); infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue);
} }
this.enableTestPoints = enableTestPoints; this.enableTestPoints = enableTestPoints;
consumer = indexWriterConfig.getIndexingChain().getChain(indexVersionCreated, segmentInfo, this.directory, fieldInfos, indexWriterConfig, this::onAbortingException); indexingChain = new IndexingChain(indexVersionCreated, segmentInfo, this.directory, fieldInfos, indexWriterConfig, this::onAbortingException);
} }
final void testPoint(String message) { final void testPoint(String message) {
@ -205,7 +183,7 @@ final class DocumentsWriterPerThread implements Accountable {
// it's very hard to fix (we can't easily distinguish aborting // it's very hard to fix (we can't easily distinguish aborting
// vs non-aborting exceptions): // vs non-aborting exceptions):
reserveOneDoc(); reserveOneDoc();
consumer.processDocument(numDocsInRAM++, doc); indexingChain.processDocument(numDocsInRAM++, doc);
} }
allDocsIndexed = true; allDocsIndexed = true;
return finishDocuments(deleteNode, docsInRamBefore); return finishDocuments(deleteNode, docsInRamBefore);
@ -343,11 +321,11 @@ final class DocumentsWriterPerThread implements Accountable {
try { try {
DocIdSetIterator softDeletedDocs; DocIdSetIterator softDeletedDocs;
if (indexWriterConfig.getSoftDeletesField() != null) { if (indexWriterConfig.getSoftDeletesField() != null) {
softDeletedDocs = consumer.getHasDocValues(indexWriterConfig.getSoftDeletesField()); softDeletedDocs = indexingChain.getHasDocValues(indexWriterConfig.getSoftDeletesField());
} else { } else {
softDeletedDocs = null; softDeletedDocs = null;
} }
sortMap = consumer.flush(flushState); sortMap = indexingChain.flush(flushState);
if (softDeletedDocs == null) { if (softDeletedDocs == null) {
flushState.softDelCountOnFlush = 0; flushState.softDelCountOnFlush = 0;
} else { } else {
@ -518,12 +496,12 @@ final class DocumentsWriterPerThread implements Accountable {
@Override @Override
public long ramBytesUsed() { public long ramBytesUsed() {
return (deleteDocIDs.length * Integer.BYTES)+ pendingUpdates.ramBytesUsed() + consumer.ramBytesUsed(); return (deleteDocIDs.length * Integer.BYTES)+ pendingUpdates.ramBytesUsed() + indexingChain.ramBytesUsed();
} }
@Override @Override
public Collection<Accountable> getChildResources() { public Collection<Accountable> getChildResources() {
return List.of(pendingUpdates, consumer); return List.of(pendingUpdates, indexingChain);
} }
@Override @Override

View File

@ -57,8 +57,7 @@ import org.apache.lucene.util.RamUsageEstimator;
/** Default general purpose indexing chain, which handles /** Default general purpose indexing chain, which handles
* indexing all types of fields. */ * indexing all types of fields. */
final class DefaultIndexingChain extends DocConsumer { final class IndexingChain implements Accountable {
final Counter bytesUsed = Counter.newCounter(); final Counter bytesUsed = Counter.newCounter();
final FieldInfos.Builder fieldInfos; final FieldInfos.Builder fieldInfos;
@ -88,7 +87,7 @@ final class DefaultIndexingChain extends DocConsumer {
private final Consumer<Throwable> abortingExceptionConsumer; private final Consumer<Throwable> abortingExceptionConsumer;
private boolean hasHitAbortingException; private boolean hasHitAbortingException;
DefaultIndexingChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory, FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig, IndexingChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory, FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
Consumer<Throwable> abortingExceptionConsumer) { Consumer<Throwable> abortingExceptionConsumer) {
this.indexCreatedVersionMajor = indexCreatedVersionMajor; this.indexCreatedVersionMajor = indexCreatedVersionMajor;
byteBlockAllocator = new ByteBlockPool.DirectTrackingAllocator(bytesUsed); byteBlockAllocator = new ByteBlockPool.DirectTrackingAllocator(bytesUsed);
@ -207,8 +206,7 @@ final class DefaultIndexingChain extends DocConsumer {
return sorter.sort(state.segmentInfo.maxDoc(), comparators.toArray(IndexSorter.DocComparator[]::new)); return sorter.sort(state.segmentInfo.maxDoc(), comparators.toArray(IndexSorter.DocComparator[]::new));
} }
@Override Sorter.DocMap flush(SegmentWriteState state) throws IOException {
public Sorter.DocMap flush(SegmentWriteState state) throws IOException {
// NOTE: caller (DocumentsWriterPerThread) handles // NOTE: caller (DocumentsWriterPerThread) handles
// aborting on any exception from this method // aborting on any exception from this method
@ -408,9 +406,8 @@ final class DefaultIndexingChain extends DocConsumer {
} }
} }
@Override
@SuppressWarnings("try") @SuppressWarnings("try")
public void abort() throws IOException{ void abort() throws IOException{
// finalizer will e.g. close any open files in the term vectors writer: // finalizer will e.g. close any open files in the term vectors writer:
try (Closeable finalizer = termsHash::abort){ try (Closeable finalizer = termsHash::abort){
storedFieldsConsumer.abort(); storedFieldsConsumer.abort();
@ -464,8 +461,7 @@ final class DefaultIndexingChain extends DocConsumer {
} }
} }
@Override void processDocument(int docID, Iterable<? extends IndexableField> document) throws IOException {
public void processDocument(int docID, Iterable<? extends IndexableField> document) throws IOException {
// How many indexed field names we've seen (collapses // How many indexed field names we've seen (collapses
// multiple field instances by the same name): // multiple field instances by the same name):
@ -1008,7 +1004,6 @@ final class DefaultIndexingChain extends DocConsumer {
} }
} }
@Override
DocIdSetIterator getHasDocValues(String field) { DocIdSetIterator getHasDocValues(String field) {
PerField perField = getPerField(field); PerField perField = getPerField(field);
if (perField != null) { if (perField != null) {

View File

@ -22,7 +22,6 @@ import java.util.Set;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
@ -67,10 +66,6 @@ public class LiveIndexWriterConfig {
/** {@link MergeScheduler} to use for running merges. */ /** {@link MergeScheduler} to use for running merges. */
protected volatile MergeScheduler mergeScheduler; protected volatile MergeScheduler mergeScheduler;
/** {@link IndexingChain} that determines how documents are
* indexed. */
protected volatile IndexingChain indexingChain;
/** {@link Codec} used to write new segments. */ /** {@link Codec} used to write new segments. */
protected volatile Codec codec; protected volatile Codec codec;
@ -124,7 +119,6 @@ public class LiveIndexWriterConfig {
openMode = OpenMode.CREATE_OR_APPEND; openMode = OpenMode.CREATE_OR_APPEND;
similarity = IndexSearcher.getDefaultSimilarity(); similarity = IndexSearcher.getDefaultSimilarity();
mergeScheduler = new ConcurrentMergeScheduler(); mergeScheduler = new ConcurrentMergeScheduler();
indexingChain = DocumentsWriterPerThread.defaultIndexingChain;
codec = Codec.getDefault(); codec = Codec.getDefault();
if (codec == null) { if (codec == null) {
throw new NullPointerException(); throw new NullPointerException();
@ -353,13 +347,6 @@ public class LiveIndexWriterConfig {
return readerPooling; return readerPooling;
} }
/**
* Returns the indexing chain.
*/
IndexingChain getIndexingChain() {
return indexingChain;
}
/** /**
* Returns the max amount of memory each {@link DocumentsWriterPerThread} can * Returns the max amount of memory each {@link DocumentsWriterPerThread} can
* consume until forcefully flushed. * consume until forcefully flushed.

View File

@ -3385,7 +3385,7 @@ public class TestIndexWriter extends LuceneTestCase {
try (Directory dir = new FilterDirectory(newDirectory()) { try (Directory dir = new FilterDirectory(newDirectory()) {
@Override @Override
public IndexOutput createOutput(String name, IOContext context) throws IOException { public IndexOutput createOutput(String name, IOContext context) throws IOException {
if (callStackContains(DefaultIndexingChain.class, "flush")) { if (callStackContains(IndexingChain.class, "flush")) {
try { try {
inFlush.countDown(); inFlush.countDown();
latch.await(); latch.await();

View File

@ -22,13 +22,11 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.function.Consumer;
import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.similarities.ClassicSimilarity; import org.apache.lucene.search.similarities.ClassicSimilarity;
@ -43,17 +41,6 @@ public class TestIndexWriterConfig extends LuceneTestCase {
// Does not implement anything - used only for type checking on IndexWriterConfig. // Does not implement anything - used only for type checking on IndexWriterConfig.
} }
private static final class MyIndexingChain extends IndexingChain {
@Override
DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory,
FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
Consumer<Throwable> abortingExceptionConsumer) {
return null;
}
// Does not implement anything - used only for type checking on IndexWriterConfig.
}
@Test @Test
public void testDefaults() throws Exception { public void testDefaults() throws Exception {
IndexWriterConfig conf = new IndexWriterConfig(new MockAnalyzer(random())); IndexWriterConfig conf = new IndexWriterConfig(new MockAnalyzer(random()));
@ -67,7 +54,6 @@ public class TestIndexWriterConfig extends LuceneTestCase {
assertEquals(IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB, conf.getRAMBufferSizeMB(), 0.0); assertEquals(IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB, conf.getRAMBufferSizeMB(), 0.0);
assertEquals(IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS, conf.getMaxBufferedDocs()); assertEquals(IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS, conf.getMaxBufferedDocs());
assertEquals(IndexWriterConfig.DEFAULT_READER_POOLING, conf.getReaderPooling()); assertEquals(IndexWriterConfig.DEFAULT_READER_POOLING, conf.getReaderPooling());
assertTrue(DocumentsWriterPerThread.defaultIndexingChain == conf.getIndexingChain());
assertNull(conf.getMergedSegmentWarmer()); assertNull(conf.getMergedSegmentWarmer());
assertEquals(TieredMergePolicy.class, conf.getMergePolicy().getClass()); assertEquals(TieredMergePolicy.class, conf.getMergePolicy().getClass());
assertEquals(FlushByRamOrCountsPolicy.class, conf.getFlushPolicy().getClass()); assertEquals(FlushByRamOrCountsPolicy.class, conf.getFlushPolicy().getClass());
@ -232,9 +218,6 @@ public class TestIndexWriterConfig extends LuceneTestCase {
conf.setSimilarity(null); conf.setSimilarity(null);
}); });
// Test IndexingChain
assertTrue(DocumentsWriterPerThread.defaultIndexingChain == conf.getIndexingChain());
expectThrows(IllegalArgumentException.class, () -> { expectThrows(IllegalArgumentException.class, () -> {
conf.setMaxBufferedDocs(1); conf.setMaxBufferedDocs(1);
}); });

View File

@ -453,7 +453,7 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
@Override @Override
public void eval(MockDirectoryWrapper dir) throws IOException { public void eval(MockDirectoryWrapper dir) throws IOException {
if (doFail) { if (doFail) {
if (callStackContains(DefaultIndexingChain.class, "flush")) { if (callStackContains(IndexingChain.class, "flush")) {
if (onlyOnce) if (onlyOnce)
doFail = false; doFail = false;
//System.out.println(Thread.currentThread().getName() + ": NOW FAIL: onlyOnce=" + onlyOnce); //System.out.println(Thread.currentThread().getName() + ": NOW FAIL: onlyOnce=" + onlyOnce);