From ff8b81afc59dc09ea4dfdd02a58398f08fcad8a8 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 12 Sep 2024 14:43:26 +0200 Subject: [PATCH] Integrate merge-time index reordering with the intra-merge executor. (#13289) Index reordering can benefit greatly from parallelism, so it should try to use the intra-merge executor when possible. --- .../org/apache/lucene/index/IndexWriter.java | 19 ++- .../org/apache/lucene/index/MergePolicy.java | 8 +- .../lucene/misc/index/BPIndexReorderer.java | 125 ++++++++++-------- .../misc/index/BPReorderingMergePolicy.java | 6 +- .../misc/index/TestBPIndexReorderer.java | 8 +- .../tests/index/MockRandomMergePolicy.java | 21 ++- 6 files changed, 114 insertions(+), 73 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 7d1a57bb5ad..def9ef06fce 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -37,6 +37,7 @@ import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -3434,9 +3435,11 @@ public class IndexWriter .map(FieldInfos::getParentField) .anyMatch(Objects::isNull); + final Executor intraMergeExecutor = mergeScheduler.getIntraMergeExecutor(merge); + if (hasIndexSort == false && hasBlocksButNoParentField == false && readers.isEmpty() == false) { CodecReader mergedReader = SlowCompositeCodecReaderWrapper.wrap(readers); - DocMap docMap = merge.reorder(mergedReader, directory); + DocMap docMap = merge.reorder(mergedReader, directory, intraMergeExecutor); if (docMap != null) { readers = Collections.singletonList(SortingCodecReader.wrap(mergedReader, docMap, null)); } @@ -3450,7 +3453,7 @@ public class IndexWriter trackingDir, globalFieldNumberMap, context, - mergeScheduler.getIntraMergeExecutor(merge)); + intraMergeExecutor); if (!merger.shouldMerge()) { return; @@ -3928,9 +3931,9 @@ public class IndexWriter } @Override - public Sorter.DocMap reorder(CodecReader reader, Directory dir) - throws IOException { - return toWrap.reorder(reader, dir); // must delegate + public Sorter.DocMap reorder( + CodecReader reader, Directory dir, Executor executor) throws IOException { + return toWrap.reorder(reader, dir, executor); // must delegate } @Override @@ -5205,6 +5208,8 @@ public class IndexWriter mergeReaders.add(wrappedReader); } + final Executor intraMergeExecutor = mergeScheduler.getIntraMergeExecutor(merge); + MergeState.DocMap[] reorderDocMaps = null; // Don't reorder if an explicit sort is configured. final boolean hasIndexSort = config.getIndexSort() != null; @@ -5219,7 +5224,7 @@ public class IndexWriter if (hasIndexSort == false && hasBlocksButNoParentField == false) { // Create a merged view of the input segments. This effectively does the merge. CodecReader mergedView = SlowCompositeCodecReaderWrapper.wrap(mergeReaders); - Sorter.DocMap docMap = merge.reorder(mergedView, directory); + Sorter.DocMap docMap = merge.reorder(mergedView, directory, intraMergeExecutor); if (docMap != null) { reorderDocMaps = new MergeState.DocMap[mergeReaders.size()]; int docBase = 0; @@ -5249,7 +5254,7 @@ public class IndexWriter dirWrapper, globalFieldNumberMap, context, - mergeScheduler.getIntraMergeExecutor(merge)); + intraMergeExecutor); merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get())); merge.checkAborted(); diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index 2eaab9c58f8..d66f5648c03 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -292,7 +293,7 @@ public abstract class MergePolicy { * Wrap a reader prior to merging in order to add/remove fields or documents. * *

NOTE: It is illegal to reorder doc IDs here, use {@link - * #reorder(CodecReader,Directory)} instead. + * #reorder(CodecReader,Directory,Executor)} instead. */ public CodecReader wrapForMerge(CodecReader reader) throws IOException { return reader; @@ -308,9 +309,12 @@ public abstract class MergePolicy { * * @param reader The reader to reorder. * @param dir The {@link Directory} of the index, which may be used to create temporary files. + * @param executor An executor that can be used to parallelize the reordering logic. May be + * {@code null} if no concurrency is supported. * @lucene.experimental */ - public Sorter.DocMap reorder(CodecReader reader, Directory dir) throws IOException { + public Sorter.DocMap reorder(CodecReader reader, Directory dir, Executor executor) + throws IOException { return null; } diff --git a/lucene/misc/src/java/org/apache/lucene/misc/index/BPIndexReorderer.java b/lucene/misc/src/java/org/apache/lucene/misc/index/BPIndexReorderer.java index 43b6c7b9c04..f51321fe842 100644 --- a/lucene/misc/src/java/org/apache/lucene/misc/index/BPIndexReorderer.java +++ b/lucene/misc/src/java/org/apache/lucene/misc/index/BPIndexReorderer.java @@ -22,8 +22,8 @@ import java.io.UncheckedIOException; import java.util.Arrays; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.RecursiveAction; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CodecReader; import org.apache.lucene.index.DocValues; @@ -37,6 +37,7 @@ import org.apache.lucene.index.SortingCodecReader; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.TaskExecutor; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.DataInput; import org.apache.lucene.store.DataOutput; @@ -123,7 +124,6 @@ public final class BPIndexReorderer { private float maxDocFreq; private int minPartitionSize; private int maxIters; - private ForkJoinPool forkJoinPool; private double ramBudgetMB; private Set fields; @@ -133,7 +133,6 @@ public final class BPIndexReorderer { setMaxDocFreq(1f); setMinPartitionSize(DEFAULT_MIN_PARTITION_SIZE); setMaxIters(DEFAULT_MAX_ITERS); - setForkJoinPool(null); // 10% of the available heap size by default setRAMBudgetMB(Runtime.getRuntime().totalMemory() / 1024d / 1024d / 10d); setFields(null); @@ -181,20 +180,6 @@ public final class BPIndexReorderer { this.maxIters = maxIters; } - /** - * Set the {@link ForkJoinPool} to run graph partitioning concurrently. - * - *

NOTE: A value of {@code null} can be used to run in the current thread, which is the - * default. - */ - public void setForkJoinPool(ForkJoinPool forkJoinPool) { - this.forkJoinPool = forkJoinPool; - } - - private int getParallelism() { - return forkJoinPool == null ? 1 : forkJoinPool.getParallelism(); - } - /** * Set the amount of RAM that graph partitioning is allowed to use. More RAM allows running * faster. If not enough RAM is provided, a {@link NotEnoughRAMException} will be thrown. This is @@ -225,21 +210,18 @@ public final class BPIndexReorderer { } } - private abstract class BaseRecursiveAction extends RecursiveAction { + private abstract class BaseRecursiveAction implements Callable { + protected final TaskExecutor executor; protected final int depth; - BaseRecursiveAction(int depth) { + BaseRecursiveAction(TaskExecutor executor, int depth) { + this.executor = executor; this.depth = depth; } protected final boolean shouldFork(int problemSize, int totalProblemSize) { - if (forkJoinPool == null) { - return false; - } - if (getSurplusQueuedTaskCount() > 3) { - // Fork tasks if this worker doesn't have more queued work than other workers - // See javadocs of #getSurplusQueuedTaskCount for more details + if (executor == null) { return false; } if (problemSize == totalProblemSize) { @@ -249,6 +231,18 @@ public final class BPIndexReorderer { } return problemSize > FORK_THRESHOLD; } + + @Override + public abstract Void call(); + + protected final void invokeAll(BaseRecursiveAction... actions) { + assert executor != null : "Only call invokeAll if shouldFork returned true"; + try { + executor.invokeAll(Arrays.asList(actions)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } private class IndexReorderingTask extends BaseRecursiveAction { @@ -263,8 +257,9 @@ public final class BPIndexReorderer { float[] biases, CloseableThreadLocal threadLocal, BitSet parents, + TaskExecutor executor, int depth) { - super(depth); + super(executor, depth); this.docIDs = docIDs; this.biases = biases; this.threadLocal = threadLocal; @@ -292,7 +287,7 @@ public final class BPIndexReorderer { } @Override - protected void compute() { + public Void call() { if (depth > 0) { Arrays.sort(docIDs.ints, docIDs.offset, docIDs.offset + docIDs.length); } else { @@ -302,7 +297,7 @@ public final class BPIndexReorderer { int halfLength = docIDs.length / 2; if (halfLength < minPartitionSize) { - return; + return null; } IntsRef left = new IntsRef(docIDs.ints, docIDs.offset, halfLength); @@ -349,7 +344,7 @@ public final class BPIndexReorderer { if (split == docIDs.offset) { // No good split on the left side either: this slice has a single parent document, no // reordering is possible. Stop recursing. - return; + return null; } } @@ -362,16 +357,17 @@ public final class BPIndexReorderer { // It is fine for all tasks to share the same docs / biases array since they all work on // different slices of the array at a given point in time. IndexReorderingTask leftTask = - new IndexReorderingTask(left, biases, threadLocal, parents, depth + 1); + new IndexReorderingTask(left, biases, threadLocal, parents, executor, depth + 1); IndexReorderingTask rightTask = - new IndexReorderingTask(right, biases, threadLocal, parents, depth + 1); + new IndexReorderingTask(right, biases, threadLocal, parents, executor, depth + 1); if (shouldFork(docIDs.length, docIDs.ints.length)) { invokeAll(leftTask, rightTask); } else { - leftTask.compute(); - rightTask.compute(); + leftTask.call(); + rightTask.call(); } + return null; } // used for asserts @@ -422,8 +418,9 @@ public final class BPIndexReorderer { leftDocFreqs, rightDocFreqs, threadLocal, + executor, depth) - .compute(); + .call(); if (parents != null) { for (int i = docIDs.offset, end = docIDs.offset + docIDs.length; i < end; ) { @@ -592,8 +589,9 @@ public final class BPIndexReorderer { int[] fromDocFreqs, int[] toDocFreqs, CloseableThreadLocal threadLocal, + TaskExecutor executor, int depth) { - super(depth); + super(executor, depth); this.docs = docs; this.biases = biases; this.from = from; @@ -604,15 +602,15 @@ public final class BPIndexReorderer { } @Override - protected void compute() { + public Void call() { final int problemSize = to - from; if (problemSize > 1 && shouldFork(problemSize, docs.length)) { final int mid = (from + to) >>> 1; invokeAll( new ComputeBiasTask( - docs, biases, from, mid, fromDocFreqs, toDocFreqs, threadLocal, depth), + docs, biases, from, mid, fromDocFreqs, toDocFreqs, threadLocal, executor, depth), new ComputeBiasTask( - docs, biases, mid, to, fromDocFreqs, toDocFreqs, threadLocal, depth)); + docs, biases, mid, to, fromDocFreqs, toDocFreqs, threadLocal, executor, depth)); } else { ForwardIndex forwardIndex = threadLocal.get().forwardIndex; try { @@ -623,6 +621,7 @@ public final class BPIndexReorderer { throw new UncheckedIOException(e); } } + return null; } /** @@ -707,12 +706,16 @@ public final class BPIndexReorderer { } private int writePostings( - CodecReader reader, Set fields, Directory tempDir, DataOutput postingsOut) + CodecReader reader, + Set fields, + Directory tempDir, + DataOutput postingsOut, + int parallelism) throws IOException { final int maxNumTerms = (int) ((ramBudgetMB * 1024 * 1024 - docRAMRequirements(reader.maxDoc())) - / getParallelism() + / parallelism / termRAMRequirementsPerThreadPerTerm()); final int maxDocFreq = (int) ((double) this.maxDocFreq * reader.maxDoc()); @@ -825,9 +828,10 @@ public final class BPIndexReorderer { /** * Expert: Compute the {@link DocMap} that holds the new doc ID numbering. This is exposed to * enable integration into {@link BPReorderingMergePolicy}, {@link #reorder(CodecReader, - * Directory)} should be preferred in general. + * Directory, Executor)} should be preferred in general. */ - public Sorter.DocMap computeDocMap(CodecReader reader, Directory tempDir) throws IOException { + public Sorter.DocMap computeDocMap(CodecReader reader, Directory tempDir, Executor executor) + throws IOException { if (docRAMRequirements(reader.maxDoc()) >= ramBudgetMB * 1024 * 1024) { throw new NotEnoughRAMException( "At least " @@ -847,7 +851,8 @@ public final class BPIndexReorderer { } } - int[] newToOld = computePermutation(reader, fields, tempDir); + TaskExecutor taskExecutor = executor == null ? null : new TaskExecutor(executor); + int[] newToOld = computePermutation(reader, fields, tempDir, taskExecutor); int[] oldToNew = new int[newToOld.length]; for (int i = 0; i < newToOld.length; ++i) { oldToNew[newToOld[i]] = i; @@ -877,27 +882,42 @@ public final class BPIndexReorderer { * evaluation efficiency. Note that the returned {@link CodecReader} is slow and should typically * be used in a call to {@link IndexWriter#addIndexes(CodecReader...)}. * + *

The provided {@link Executor} can be used to perform reordering concurrently. A value of + * {@code null} indicates that reordering should be performed in the current thread. + * + *

NOTE: The provided {@link Executor} must not reject tasks. + * * @throws NotEnoughRAMException if not enough RAM is provided */ - public CodecReader reorder(CodecReader reader, Directory tempDir) throws IOException { - Sorter.DocMap docMap = computeDocMap(reader, tempDir); + public CodecReader reorder(CodecReader reader, Directory tempDir, Executor executor) + throws IOException { + Sorter.DocMap docMap = computeDocMap(reader, tempDir, executor); return SortingCodecReader.wrap(reader, docMap, null); } /** * Compute a permutation of the doc ID space that reduces log gaps between consecutive postings. */ - private int[] computePermutation(CodecReader reader, Set fields, Directory dir) + private int[] computePermutation( + CodecReader reader, Set fields, Directory dir, TaskExecutor executor) throws IOException { TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir); + final int parallelism; + if (executor == null) { + parallelism = 1; + } else { + // Assume as many threads as processors + parallelism = Runtime.getRuntime().availableProcessors(); + } + final int maxDoc = reader.maxDoc(); ForwardIndex forwardIndex = null; IndexOutput postingsOutput = null; boolean success = false; try { postingsOutput = trackingDir.createTempOutput("postings", "", IOContext.DEFAULT); - int numTerms = writePostings(reader, fields, trackingDir, postingsOutput); + int numTerms = writePostings(reader, fields, trackingDir, postingsOutput, parallelism); CodecUtil.writeFooter(postingsOutput); postingsOutput.close(); final ForwardIndex finalForwardIndex = @@ -924,14 +944,7 @@ public final class BPIndexReorderer { } }) { IntsRef docs = new IntsRef(sortedDocs, 0, sortedDocs.length); - IndexReorderingTask task = - new IndexReorderingTask(docs, new float[maxDoc], threadLocal, parents, 0); - if (forkJoinPool != null) { - forkJoinPool.execute(task); - task.join(); - } else { - task.compute(); - } + new IndexReorderingTask(docs, new float[maxDoc], threadLocal, parents, executor, 0).call(); } success = true; diff --git a/lucene/misc/src/java/org/apache/lucene/misc/index/BPReorderingMergePolicy.java b/lucene/misc/src/java/org/apache/lucene/misc/index/BPReorderingMergePolicy.java index 077b3891556..5cd363192fa 100644 --- a/lucene/misc/src/java/org/apache/lucene/misc/index/BPReorderingMergePolicy.java +++ b/lucene/misc/src/java/org/apache/lucene/misc/index/BPReorderingMergePolicy.java @@ -19,6 +19,7 @@ package org.apache.lucene.misc.index; import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.Executor; import org.apache.lucene.index.CodecReader; import org.apache.lucene.index.FilterMergePolicy; import org.apache.lucene.index.MergePolicy; @@ -129,11 +130,12 @@ public final class BPReorderingMergePolicy extends FilterMergePolicy { } @Override - public Sorter.DocMap reorder(CodecReader reader, Directory dir) throws IOException { + public Sorter.DocMap reorder(CodecReader reader, Directory dir, Executor executor) + throws IOException { Sorter.DocMap docMap = null; if (reader.numDocs() >= minNumDocs) { try { - docMap = reorderer.computeDocMap(reader, dir); + docMap = reorderer.computeDocMap(reader, dir, executor); } catch ( @SuppressWarnings("unused") NotEnoughRAMException e) { diff --git a/lucene/misc/src/test/org/apache/lucene/misc/index/TestBPIndexReorderer.java b/lucene/misc/src/test/org/apache/lucene/misc/index/TestBPIndexReorderer.java index f4322ff600a..b7da3088df2 100644 --- a/lucene/misc/src/test/org/apache/lucene/misc/index/TestBPIndexReorderer.java +++ b/lucene/misc/src/test/org/apache/lucene/misc/index/TestBPIndexReorderer.java @@ -116,11 +116,10 @@ public class TestBPIndexReorderer extends LuceneTestCase { CodecReader codecReader = SlowCodecReaderWrapper.wrap(leafRealer); BPIndexReorderer reorderer = new BPIndexReorderer(); - reorderer.setForkJoinPool(pool); reorderer.setMinDocFreq(2); reorderer.setMinPartitionSize(1); reorderer.setMaxIters(10); - CodecReader reordered = reorderer.reorder(codecReader, dir); + CodecReader reordered = reorderer.reorder(codecReader, dir, pool); String[] ids = new String[codecReader.maxDoc()]; StoredFields storedFields = reordered.storedFields(); for (int i = 0; i < codecReader.maxDoc(); ++i) { @@ -180,11 +179,10 @@ public class TestBPIndexReorderer extends LuceneTestCase { CodecReader codecReader = SlowCodecReaderWrapper.wrap(leafRealer); BPIndexReorderer reorderer = new BPIndexReorderer(); - reorderer.setForkJoinPool(pool); reorderer.setMinDocFreq(2); reorderer.setMinPartitionSize(1); reorderer.setMaxIters(10); - CodecReader reordered = reorderer.reorder(codecReader, dir); + CodecReader reordered = reorderer.reorder(codecReader, dir, pool); StoredFields storedFields = reordered.storedFields(); assertEquals("2", storedFields.document(0).get("id")); @@ -307,7 +305,7 @@ public class TestBPIndexReorderer extends LuceneTestCase { reorderer.setMinDocFreq(2); reorderer.setMinPartitionSize(1); reorderer.setMaxIters(10); - CodecReader reordered = reorderer.reorder(codecReader, dir); + CodecReader reordered = reorderer.reorder(codecReader, dir, null); String[] ids = new String[codecReader.maxDoc()]; StoredFields storedFields = reordered.storedFields(); for (int i = 0; i < codecReader.maxDoc(); ++i) { diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/index/MockRandomMergePolicy.java b/lucene/test-framework/src/java/org/apache/lucene/tests/index/MockRandomMergePolicy.java index 1b509c7c396..74f3b87ed5d 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/index/MockRandomMergePolicy.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/index/MockRandomMergePolicy.java @@ -23,6 +23,9 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; import org.apache.lucene.index.CodecReader; import org.apache.lucene.index.DocValues; import org.apache.lucene.index.FilterLeafReader; @@ -36,6 +39,8 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.ThreadInterruptedException; /** MergePolicy that makes random decisions for testing. */ public class MockRandomMergePolicy extends MergePolicy { @@ -241,7 +246,8 @@ public class MockRandomMergePolicy extends MergePolicy { } @Override - public Sorter.DocMap reorder(CodecReader reader, Directory dir) throws IOException { + public Sorter.DocMap reorder(CodecReader reader, Directory dir, Executor executor) + throws IOException { if (r.nextBoolean()) { if (LuceneTestCase.VERBOSE) { System.out.println("NOTE: MockRandomMergePolicy now reverses reader=" + reader); @@ -249,6 +255,19 @@ public class MockRandomMergePolicy extends MergePolicy { // Reverse the doc ID order return reverse(reader); } + if (executor != null && r.nextBoolean()) { + // submit random work to the executor + Runnable dummyRunnable = () -> {}; + FutureTask task = new FutureTask<>(dummyRunnable, null); + executor.execute(task); + try { + task.get(); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } catch (ExecutionException e) { + throw IOUtils.rethrowAlways(e.getCause()); + } + } return null; } }