mirror of https://github.com/apache/lucene.git
LUCENE-8757: Improving Default Segments To Thread Mapping Algorithm
The current slicing algorithm assigns a thread per segment, which can be detrimental to performance in case the distribution has a large number of small segments. The patch introduces a slicing algorithm which coalesces smaller segments to a single thread, thus reducing the impact of context switching by limiting the number of threads Signed-off-by: Adrien Grand <jpountz@gmail.com>
This commit is contained in:
parent
5a694ea26f
commit
87e936f1bb
|
@ -56,6 +56,10 @@ Improvements
|
|||
* LUCENE-8770: BlockMaxConjunctionScorer now leverages two-phase iterators in order to avoid
|
||||
executing the second phase when scorers don't intersect. (Adrien Grand, Jim Ferenczi)
|
||||
|
||||
* LUCENE-8757: When provided with an ExecutorService to run queries across
|
||||
multiple threads, IndexSearcher now groups small segments together, up to
|
||||
250k docs per slice. (Atri Sharma via Adrien Grand)
|
||||
|
||||
======================= Lucene 8.1.1 =======================
|
||||
(No Changes)
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
@ -100,6 +101,13 @@ public class IndexSearcher {
|
|||
*/
|
||||
private static final int TOTAL_HITS_THRESHOLD = 1000;
|
||||
|
||||
/**
|
||||
* Thresholds for index slice allocation logic. To change the default, extend
|
||||
* <code> IndexSearcher</code> and use custom values
|
||||
*/
|
||||
private static final int MAX_DOCS_PER_SLICE = 250_000;
|
||||
private static final int MAX_SEGMENTS_PER_SLICE = 5;
|
||||
|
||||
final IndexReader reader; // package private for testing!
|
||||
|
||||
// NOTE: these members might change in incompatible ways
|
||||
|
@ -268,17 +276,60 @@ public class IndexSearcher {
|
|||
|
||||
/**
|
||||
* Expert: Creates an array of leaf slices each holding a subset of the given leaves.
|
||||
* Each {@link LeafSlice} is executed in a single thread. By default there
|
||||
* will be one {@link LeafSlice} per leaf ({@link org.apache.lucene.index.LeafReaderContext}).
|
||||
* Each {@link LeafSlice} is executed in a single thread. By default, segments with more than
|
||||
* MAX_DOCS_PER_SLICE will get their own thread
|
||||
*/
|
||||
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
|
||||
LeafSlice[] slices = new LeafSlice[leaves.size()];
|
||||
for (int i = 0; i < slices.length; i++) {
|
||||
slices[i] = new LeafSlice(leaves.get(i));
|
||||
return slices(leaves, MAX_DOCS_PER_SLICE, MAX_SEGMENTS_PER_SLICE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Static method to segregate LeafReaderContexts amongst multiple slices
|
||||
*/
|
||||
public static LeafSlice[] slices (List<LeafReaderContext> leaves, int maxDocsPerSlice,
|
||||
int maxSegmentsPerSlice) {
|
||||
// Make a copy so we can sort:
|
||||
List<LeafReaderContext> sortedLeaves = new ArrayList<>(leaves);
|
||||
|
||||
// Sort by maxDoc, descending:
|
||||
Collections.sort(sortedLeaves,
|
||||
Collections.reverseOrder(Comparator.comparingInt(l -> l.reader().maxDoc())));
|
||||
|
||||
final List<List<LeafReaderContext>> groupedLeaves = new ArrayList<>();
|
||||
long docSum = 0;
|
||||
List<LeafReaderContext> group = null;
|
||||
for (LeafReaderContext ctx : sortedLeaves) {
|
||||
if (ctx.reader().maxDoc() > maxDocsPerSlice) {
|
||||
assert group == null;
|
||||
groupedLeaves.add(Collections.singletonList(ctx));
|
||||
} else {
|
||||
if (group == null) {
|
||||
group = new ArrayList<>();
|
||||
group.add(ctx);
|
||||
|
||||
groupedLeaves.add(group);
|
||||
} else {
|
||||
group.add(ctx);
|
||||
}
|
||||
|
||||
docSum += ctx.reader().maxDoc();
|
||||
if (group.size() >= maxSegmentsPerSlice || docSum > maxDocsPerSlice) {
|
||||
group = null;
|
||||
docSum = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LeafSlice[] slices = new LeafSlice[groupedLeaves.size()];
|
||||
int upto = 0;
|
||||
for (List<LeafReaderContext> currentLeaf : groupedLeaves) {
|
||||
slices[upto] = new LeafSlice(currentLeaf);
|
||||
++upto;
|
||||
}
|
||||
|
||||
return slices;
|
||||
}
|
||||
|
||||
|
||||
/** Return the {@link IndexReader} this searches. */
|
||||
public IndexReader getIndexReader() {
|
||||
return reader;
|
||||
|
@ -743,8 +794,9 @@ public class IndexSearcher {
|
|||
* @lucene.experimental */
|
||||
public final LeafReaderContext[] leaves;
|
||||
|
||||
public LeafSlice(LeafReaderContext... leaves) {
|
||||
this.leaves = leaves;
|
||||
public LeafSlice(List<LeafReaderContext> leavesList) {
|
||||
Collections.sort(leavesList, Comparator.comparingInt(l -> l.docBase));
|
||||
this.leaves = leavesList.toArray(new LeafReaderContext[0]);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,268 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.lucene.index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.NamedThreadFactory;
|
||||
import org.apache.lucene.util.Version;
|
||||
|
||||
public class TestSegmentToThreadMapping extends LuceneTestCase {
|
||||
|
||||
public LeafReader dummyIndexReader(final int maxDoc) {
|
||||
return new LeafReader() {
|
||||
@Override
|
||||
public int maxDoc() {
|
||||
return maxDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numDocs() {
|
||||
return maxDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FieldInfos getFieldInfos() {
|
||||
return FieldInfos.EMPTY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bits getLiveDocs() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Terms terms(String field) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Fields getTermVectors(int doc) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NumericDocValues getNumericDocValues(String field) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BinaryDocValues getBinaryDocValues(String field) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedDocValues getSortedDocValues(String field) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedNumericDocValues getSortedNumericDocValues(String field) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedSetDocValues getSortedSetDocValues(String field) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NumericDocValues getNormValues(String field) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PointValues getPointValues(String field) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void document(int doc, StoredFieldVisitor visitor) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkIntegrity() throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public LeafMetaData getMetaData() {
|
||||
return new LeafMetaData(Version.LATEST.major, Version.LATEST, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheHelper getCoreCacheHelper() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheHelper getReaderCacheHelper() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void testSingleSlice() {
|
||||
LeafReader largeSegmentReader = dummyIndexReader(50_000);
|
||||
LeafReader firstMediumSegmentReader = dummyIndexReader(30_000);
|
||||
LeafReader secondMediumSegmentReader = dummyIndexReader(30__000);
|
||||
LeafReader thirdMediumSegmentReader = dummyIndexReader(30_000);
|
||||
List<LeafReaderContext> leafReaderContexts = new ArrayList<>();
|
||||
|
||||
leafReaderContexts.add(new LeafReaderContext(largeSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(firstMediumSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(secondMediumSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(thirdMediumSegmentReader));
|
||||
|
||||
IndexSearcher.LeafSlice[] resultSlices = IndexSearcher.slices(leafReaderContexts, 250_000, 5);
|
||||
|
||||
assertTrue(resultSlices.length == 1);
|
||||
|
||||
final LeafReaderContext[] leaves = resultSlices[0].leaves;
|
||||
|
||||
assertTrue(leaves.length == 4);
|
||||
}
|
||||
|
||||
public void testSmallSegments() {
|
||||
LeafReader firstMediumSegmentReader = dummyIndexReader(10_000);
|
||||
LeafReader secondMediumSegmentReader = dummyIndexReader(10_000);
|
||||
LeafReader thirdMediumSegmentReader = dummyIndexReader(10_000);
|
||||
LeafReader fourthMediumSegmentReader = dummyIndexReader(10_000);
|
||||
LeafReader fifthMediumSegmentReader = dummyIndexReader(10_000);
|
||||
LeafReader sixthMediumSegmentReader = dummyIndexReader(10_000);
|
||||
LeafReader seventhLargeSegmentReader = dummyIndexReader(130_000);
|
||||
LeafReader eigthLargeSegmentReader = dummyIndexReader(130_000);
|
||||
List<LeafReaderContext> leafReaderContexts = new ArrayList<>();
|
||||
|
||||
leafReaderContexts.add(new LeafReaderContext(firstMediumSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(secondMediumSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(thirdMediumSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(fourthMediumSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(fifthMediumSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(sixthMediumSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(seventhLargeSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(eigthLargeSegmentReader));
|
||||
|
||||
IndexSearcher.LeafSlice[] resultSlices = IndexSearcher.slices(leafReaderContexts, 250_000, 5);
|
||||
|
||||
assertTrue(resultSlices.length == 3);
|
||||
|
||||
final LeafReaderContext[] firstSliceleaves = resultSlices[0].leaves;
|
||||
final LeafReaderContext[] secondSliceleaves = resultSlices[1].leaves;
|
||||
final LeafReaderContext[] thirdSliceleaves = resultSlices[2].leaves;
|
||||
|
||||
assertTrue(firstSliceleaves.length == 2);
|
||||
assertTrue(secondSliceleaves.length == 5);
|
||||
assertTrue(thirdSliceleaves.length == 1);
|
||||
}
|
||||
|
||||
public void testLargeSlices() {
|
||||
LeafReader largeSegmentReader = dummyIndexReader(290_900);
|
||||
LeafReader firstMediumSegmentReader = dummyIndexReader(170_000);
|
||||
LeafReader secondMediumSegmentReader = dummyIndexReader(170_000);
|
||||
LeafReader thirdMediumSegmentReader = dummyIndexReader(170_000);
|
||||
List<LeafReaderContext> leafReaderContexts = new ArrayList<>();
|
||||
|
||||
leafReaderContexts.add(new LeafReaderContext(largeSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(firstMediumSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(secondMediumSegmentReader));
|
||||
leafReaderContexts.add(new LeafReaderContext(thirdMediumSegmentReader));
|
||||
|
||||
IndexSearcher.LeafSlice[] resultSlices = IndexSearcher.slices(leafReaderContexts, 250_000, 5);
|
||||
|
||||
assertTrue(resultSlices.length == 3);
|
||||
|
||||
final LeafReaderContext[] firstSliceleaves = resultSlices[0].leaves;
|
||||
final LeafReaderContext[] secondSliceleaves = resultSlices[1].leaves;
|
||||
final LeafReaderContext[] thirdSliceleaves = resultSlices[2].leaves;
|
||||
|
||||
assertTrue(firstSliceleaves.length == 1);
|
||||
assertTrue(secondSliceleaves.length == 2);
|
||||
assertTrue(thirdSliceleaves.length == 1);
|
||||
}
|
||||
|
||||
public void testIntraSliceDocIDOrder() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
|
||||
w.addDocument(new Document());
|
||||
w.addDocument(new Document());
|
||||
w.commit();
|
||||
w.addDocument(new Document());
|
||||
w.addDocument(new Document());
|
||||
w.commit();
|
||||
IndexReader r = w.getReader();
|
||||
w.close();
|
||||
|
||||
ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<Runnable>(),
|
||||
new NamedThreadFactory("TestSegmentToThreadMapping"));
|
||||
IndexSearcher s = new IndexSearcher(r, service);
|
||||
Query query = new MatchAllDocsQuery();
|
||||
|
||||
s.search(query, Integer.MAX_VALUE);
|
||||
|
||||
IndexSearcher.LeafSlice[] slices = s.getSlices();
|
||||
assertNotNull(slices);
|
||||
|
||||
for (IndexSearcher.LeafSlice leafSlice : slices) {
|
||||
LeafReaderContext[] leafReaderContexts = leafSlice.leaves;
|
||||
int previousDocBase = leafReaderContexts[0].docBase;
|
||||
|
||||
for (LeafReaderContext leafReaderContext : leafReaderContexts) {
|
||||
assertTrue(previousDocBase <= leafReaderContext.docBase);
|
||||
previousDocBase = leafReaderContext.docBase;
|
||||
}
|
||||
}
|
||||
|
||||
service.shutdown();
|
||||
IOUtils.close(r, dir);
|
||||
}
|
||||
|
||||
public void testRandom() {
|
||||
List<LeafReaderContext> leafReaderContexts = new ArrayList<>();
|
||||
int max = 500_000;
|
||||
int min = 10_000;
|
||||
int numSegments = random().nextInt(50);
|
||||
|
||||
for (int i = 0; i < numSegments; i++) {
|
||||
leafReaderContexts.add(new LeafReaderContext(dummyIndexReader(random().nextInt((max - min) + 1) + min)));
|
||||
}
|
||||
|
||||
IndexSearcher.LeafSlice[] resultSlices = IndexSearcher.slices(leafReaderContexts, 250_000, 5);
|
||||
|
||||
assertTrue(resultSlices.length > 0);
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import org.apache.lucene.index.LeafReaderContext;
|
|||
class AssertingCollector extends FilterCollector {
|
||||
|
||||
private int maxDoc = -1;
|
||||
private int previousLeafMaxDoc = 0;
|
||||
|
||||
/** Wrap the given collector in order to add assertions. */
|
||||
public static Collector wrap(Collector in) {
|
||||
|
@ -41,6 +42,9 @@ class AssertingCollector extends FilterCollector {
|
|||
|
||||
@Override
|
||||
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
|
||||
assert context.docBase >= previousLeafMaxDoc;
|
||||
previousLeafMaxDoc = context.docBase + context.reader().maxDoc();
|
||||
|
||||
final LeafCollector in = super.getLeafCollector(context);
|
||||
final int docBase = context.docBase;
|
||||
return new AssertingLeafCollector(in, 0, DocIdSetIterator.NO_MORE_DOCS) {
|
||||
|
@ -50,6 +54,7 @@ class AssertingCollector extends FilterCollector {
|
|||
// not only per segment
|
||||
assert docBase + doc >= maxDoc : "collection is not in order: current doc="
|
||||
+ (docBase + doc) + " while " + maxDoc + " has already been collected";
|
||||
|
||||
super.collect(doc);
|
||||
maxDoc = docBase + doc;
|
||||
}
|
||||
|
|
|
@ -1932,6 +1932,15 @@ public abstract class LuceneTestCase extends Assert {
|
|||
ret = random.nextBoolean()
|
||||
? new AssertingIndexSearcher(random, r, ex)
|
||||
: new AssertingIndexSearcher(random, r.getContext(), ex);
|
||||
} else if (random.nextBoolean()) {
|
||||
int maxDocPerSlice = 1 + random.nextInt(100000);
|
||||
int maxSegmentsPerSlice = 1 + random.nextInt(20);
|
||||
ret = new IndexSearcher(r, ex) {
|
||||
@Override
|
||||
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
|
||||
return slices(leaves, maxDocPerSlice, maxSegmentsPerSlice);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
ret = random.nextBoolean()
|
||||
? new IndexSearcher(r, ex)
|
||||
|
|
Loading…
Reference in New Issue