diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 5407e6ba6ec..b3caabde88d 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -56,6 +56,10 @@ Improvements * LUCENE-8770: BlockMaxConjunctionScorer now leverages two-phase iterators in order to avoid executing the second phase when scorers don't intersect. (Adrien Grand, Jim Ferenczi) +* LUCENE-8757: When provided with an ExecutorService to run queries across + multiple threads, IndexSearcher now groups small segments together, up to + 250k docs per slice. (Atri Sharma via Adrien Grand) + ======================= Lucene 8.1.1 ======================= (No Changes) diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java index 689409f5cc3..1748dd48216 100644 --- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java +++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.Set; @@ -100,6 +101,13 @@ public class IndexSearcher { */ private static final int TOTAL_HITS_THRESHOLD = 1000; + /** + * Thresholds for index slice allocation logic. To change the default, extend + * IndexSearcher and use custom values + */ + private static final int MAX_DOCS_PER_SLICE = 250_000; + private static final int MAX_SEGMENTS_PER_SLICE = 5; + final IndexReader reader; // package private for testing! // NOTE: these members might change in incompatible ways @@ -268,17 +276,60 @@ public class IndexSearcher { /** * Expert: Creates an array of leaf slices each holding a subset of the given leaves. - * Each {@link LeafSlice} is executed in a single thread. By default there - * will be one {@link LeafSlice} per leaf ({@link org.apache.lucene.index.LeafReaderContext}). + * Each {@link LeafSlice} is executed in a single thread. By default, segments with more than + * MAX_DOCS_PER_SLICE will get their own thread */ protected LeafSlice[] slices(List leaves) { - LeafSlice[] slices = new LeafSlice[leaves.size()]; - for (int i = 0; i < slices.length; i++) { - slices[i] = new LeafSlice(leaves.get(i)); + return slices(leaves, MAX_DOCS_PER_SLICE, MAX_SEGMENTS_PER_SLICE); + } + + /** + * Static method to segregate LeafReaderContexts amongst multiple slices + */ + public static LeafSlice[] slices (List leaves, int maxDocsPerSlice, + int maxSegmentsPerSlice) { + // Make a copy so we can sort: + List sortedLeaves = new ArrayList<>(leaves); + + // Sort by maxDoc, descending: + Collections.sort(sortedLeaves, + Collections.reverseOrder(Comparator.comparingInt(l -> l.reader().maxDoc()))); + + final List> groupedLeaves = new ArrayList<>(); + long docSum = 0; + List group = null; + for (LeafReaderContext ctx : sortedLeaves) { + if (ctx.reader().maxDoc() > maxDocsPerSlice) { + assert group == null; + groupedLeaves.add(Collections.singletonList(ctx)); + } else { + if (group == null) { + group = new ArrayList<>(); + group.add(ctx); + + groupedLeaves.add(group); + } else { + group.add(ctx); + } + + docSum += ctx.reader().maxDoc(); + if (group.size() >= maxSegmentsPerSlice || docSum > maxDocsPerSlice) { + group = null; + docSum = 0; + } + } } + + LeafSlice[] slices = new LeafSlice[groupedLeaves.size()]; + int upto = 0; + for (List currentLeaf : groupedLeaves) { + slices[upto] = new LeafSlice(currentLeaf); + ++upto; + } + return slices; } - + /** Return the {@link IndexReader} this searches. */ public IndexReader getIndexReader() { return reader; @@ -743,8 +794,9 @@ public class IndexSearcher { * @lucene.experimental */ public final LeafReaderContext[] leaves; - public LeafSlice(LeafReaderContext... leaves) { - this.leaves = leaves; + public LeafSlice(List leavesList) { + Collections.sort(leavesList, Comparator.comparingInt(l -> l.docBase)); + this.leaves = leavesList.toArray(new LeafReaderContext[0]); } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestSegmentToThreadMapping.java b/lucene/core/src/test/org/apache/lucene/index/TestSegmentToThreadMapping.java new file mode 100644 index 00000000000..62299dfd919 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestSegmentToThreadMapping.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +import org.apache.lucene.document.Document; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.NamedThreadFactory; +import org.apache.lucene.util.Version; + +public class TestSegmentToThreadMapping extends LuceneTestCase { + + public LeafReader dummyIndexReader(final int maxDoc) { + return new LeafReader() { + @Override + public int maxDoc() { + return maxDoc; + } + + @Override + public int numDocs() { + return maxDoc; + } + + @Override + public FieldInfos getFieldInfos() { + return FieldInfos.EMPTY; + } + + @Override + public Bits getLiveDocs() { + return null; + } + + @Override + public Terms terms(String field) throws IOException { + return null; + } + + @Override + public Fields getTermVectors(int doc) { + return null; + } + + @Override + public NumericDocValues getNumericDocValues(String field) { + return null; + } + + @Override + public BinaryDocValues getBinaryDocValues(String field) { + return null; + } + + @Override + public SortedDocValues getSortedDocValues(String field) { + return null; + } + + @Override + public SortedNumericDocValues getSortedNumericDocValues(String field) { + return null; + } + + @Override + public SortedSetDocValues getSortedSetDocValues(String field) { + return null; + } + + @Override + public NumericDocValues getNormValues(String field) { + return null; + } + + @Override + public PointValues getPointValues(String field) { + return null; + } + + @Override + protected void doClose() { + } + + @Override + public void document(int doc, StoredFieldVisitor visitor) { + } + + @Override + public void checkIntegrity() throws IOException { + } + + @Override + public LeafMetaData getMetaData() { + return new LeafMetaData(Version.LATEST.major, Version.LATEST, null); + } + + @Override + public CacheHelper getCoreCacheHelper() { + return null; + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } + }; + } + + public void testSingleSlice() { + LeafReader largeSegmentReader = dummyIndexReader(50_000); + LeafReader firstMediumSegmentReader = dummyIndexReader(30_000); + LeafReader secondMediumSegmentReader = dummyIndexReader(30__000); + LeafReader thirdMediumSegmentReader = dummyIndexReader(30_000); + List leafReaderContexts = new ArrayList<>(); + + leafReaderContexts.add(new LeafReaderContext(largeSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(firstMediumSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(secondMediumSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(thirdMediumSegmentReader)); + + IndexSearcher.LeafSlice[] resultSlices = IndexSearcher.slices(leafReaderContexts, 250_000, 5); + + assertTrue(resultSlices.length == 1); + + final LeafReaderContext[] leaves = resultSlices[0].leaves; + + assertTrue(leaves.length == 4); + } + + public void testSmallSegments() { + LeafReader firstMediumSegmentReader = dummyIndexReader(10_000); + LeafReader secondMediumSegmentReader = dummyIndexReader(10_000); + LeafReader thirdMediumSegmentReader = dummyIndexReader(10_000); + LeafReader fourthMediumSegmentReader = dummyIndexReader(10_000); + LeafReader fifthMediumSegmentReader = dummyIndexReader(10_000); + LeafReader sixthMediumSegmentReader = dummyIndexReader(10_000); + LeafReader seventhLargeSegmentReader = dummyIndexReader(130_000); + LeafReader eigthLargeSegmentReader = dummyIndexReader(130_000); + List leafReaderContexts = new ArrayList<>(); + + leafReaderContexts.add(new LeafReaderContext(firstMediumSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(secondMediumSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(thirdMediumSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(fourthMediumSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(fifthMediumSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(sixthMediumSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(seventhLargeSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(eigthLargeSegmentReader)); + + IndexSearcher.LeafSlice[] resultSlices = IndexSearcher.slices(leafReaderContexts, 250_000, 5); + + assertTrue(resultSlices.length == 3); + + final LeafReaderContext[] firstSliceleaves = resultSlices[0].leaves; + final LeafReaderContext[] secondSliceleaves = resultSlices[1].leaves; + final LeafReaderContext[] thirdSliceleaves = resultSlices[2].leaves; + + assertTrue(firstSliceleaves.length == 2); + assertTrue(secondSliceleaves.length == 5); + assertTrue(thirdSliceleaves.length == 1); + } + + public void testLargeSlices() { + LeafReader largeSegmentReader = dummyIndexReader(290_900); + LeafReader firstMediumSegmentReader = dummyIndexReader(170_000); + LeafReader secondMediumSegmentReader = dummyIndexReader(170_000); + LeafReader thirdMediumSegmentReader = dummyIndexReader(170_000); + List leafReaderContexts = new ArrayList<>(); + + leafReaderContexts.add(new LeafReaderContext(largeSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(firstMediumSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(secondMediumSegmentReader)); + leafReaderContexts.add(new LeafReaderContext(thirdMediumSegmentReader)); + + IndexSearcher.LeafSlice[] resultSlices = IndexSearcher.slices(leafReaderContexts, 250_000, 5); + + assertTrue(resultSlices.length == 3); + + final LeafReaderContext[] firstSliceleaves = resultSlices[0].leaves; + final LeafReaderContext[] secondSliceleaves = resultSlices[1].leaves; + final LeafReaderContext[] thirdSliceleaves = resultSlices[2].leaves; + + assertTrue(firstSliceleaves.length == 1); + assertTrue(secondSliceleaves.length == 2); + assertTrue(thirdSliceleaves.length == 1); + } + + public void testIntraSliceDocIDOrder() throws Exception { + Directory dir = newDirectory(); + RandomIndexWriter w = new RandomIndexWriter(random(), dir); + w.addDocument(new Document()); + w.addDocument(new Document()); + w.commit(); + w.addDocument(new Document()); + w.addDocument(new Document()); + w.commit(); + IndexReader r = w.getReader(); + w.close(); + + ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + new NamedThreadFactory("TestSegmentToThreadMapping")); + IndexSearcher s = new IndexSearcher(r, service); + Query query = new MatchAllDocsQuery(); + + s.search(query, Integer.MAX_VALUE); + + IndexSearcher.LeafSlice[] slices = s.getSlices(); + assertNotNull(slices); + + for (IndexSearcher.LeafSlice leafSlice : slices) { + LeafReaderContext[] leafReaderContexts = leafSlice.leaves; + int previousDocBase = leafReaderContexts[0].docBase; + + for (LeafReaderContext leafReaderContext : leafReaderContexts) { + assertTrue(previousDocBase <= leafReaderContext.docBase); + previousDocBase = leafReaderContext.docBase; + } + } + + service.shutdown(); + IOUtils.close(r, dir); + } + + public void testRandom() { + List leafReaderContexts = new ArrayList<>(); + int max = 500_000; + int min = 10_000; + int numSegments = random().nextInt(50); + + for (int i = 0; i < numSegments; i++) { + leafReaderContexts.add(new LeafReaderContext(dummyIndexReader(random().nextInt((max - min) + 1) + min))); + } + + IndexSearcher.LeafSlice[] resultSlices = IndexSearcher.slices(leafReaderContexts, 250_000, 5); + + assertTrue(resultSlices.length > 0); + } +} diff --git a/lucene/test-framework/src/java/org/apache/lucene/search/AssertingCollector.java b/lucene/test-framework/src/java/org/apache/lucene/search/AssertingCollector.java index 125d681298e..bf842cefb68 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/search/AssertingCollector.java +++ b/lucene/test-framework/src/java/org/apache/lucene/search/AssertingCollector.java @@ -26,6 +26,7 @@ import org.apache.lucene.index.LeafReaderContext; class AssertingCollector extends FilterCollector { private int maxDoc = -1; + private int previousLeafMaxDoc = 0; /** Wrap the given collector in order to add assertions. */ public static Collector wrap(Collector in) { @@ -41,6 +42,9 @@ class AssertingCollector extends FilterCollector { @Override public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { + assert context.docBase >= previousLeafMaxDoc; + previousLeafMaxDoc = context.docBase + context.reader().maxDoc(); + final LeafCollector in = super.getLeafCollector(context); final int docBase = context.docBase; return new AssertingLeafCollector(in, 0, DocIdSetIterator.NO_MORE_DOCS) { @@ -50,6 +54,7 @@ class AssertingCollector extends FilterCollector { // not only per segment assert docBase + doc >= maxDoc : "collection is not in order: current doc=" + (docBase + doc) + " while " + maxDoc + " has already been collected"; + super.collect(doc); maxDoc = docBase + doc; } diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java index 6aa6f03753f..92d364d04bf 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java @@ -1932,6 +1932,15 @@ public abstract class LuceneTestCase extends Assert { ret = random.nextBoolean() ? new AssertingIndexSearcher(random, r, ex) : new AssertingIndexSearcher(random, r.getContext(), ex); + } else if (random.nextBoolean()) { + int maxDocPerSlice = 1 + random.nextInt(100000); + int maxSegmentsPerSlice = 1 + random.nextInt(20); + ret = new IndexSearcher(r, ex) { + @Override + protected LeafSlice[] slices(List leaves) { + return slices(leaves, maxDocPerSlice, maxSegmentsPerSlice); + } + }; } else { ret = random.nextBoolean() ? new IndexSearcher(r, ex)