diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index d5c3f130430..9016a503511 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -139,6 +139,10 @@ New Features allows exact hit extraction and will enable implementation of accurate highlighters. (Alan Woodward, Adrien Grand, David Smiley) +* LUCENE-8246: Allow to customize the number of deletes a merge claims. This + helps merge policies in the soft-delete case to correctly implement retention + policies without triggering uncessary merges. (Simon Willnauer, Mike McCandless) + Bug Fixes * LUCENE-8234: Fixed bug in how spatial relationship is computed for diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 8ba460d70b5..e973b91f47e 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -5221,4 +5221,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { return false; } + + /** + * Returns the number of deletes a merge would claim back if the given segment is merged. + * @see MergePolicy#numDeletesToMerge(SegmentCommitInfo, int, org.apache.lucene.util.IOSupplier) + * @param info the segment to get the number of deletes for + * @lucene.experimental + */ + public final int numDeletesToMerge(SegmentCommitInfo info) throws IOException { + MergePolicy mergePolicy = config.getMergePolicy(); + final ReadersAndUpdates rld = readerPool.get(info, false); + int numDeletesToMerge; + if (rld != null) { + numDeletesToMerge = rld.numDeletesToMerge(mergePolicy); + } else { + numDeletesToMerge = mergePolicy.numDeletesToMerge(info, 0, null); + } + assert numDeletesToMerge <= info.info.maxDoc() : + "numDeletesToMerge: " + numDeletesToMerge + " > maxDoc: " + info.info.maxDoc(); + return numDeletesToMerge; + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java index 78025634b4a..134f6ff848f 100644 --- a/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java @@ -150,7 +150,7 @@ public abstract class LogMergePolicy extends MergePolicy { * #setCalibrateSizeByDeletes} is set. */ protected long sizeDocs(SegmentCommitInfo info, IndexWriter writer) throws IOException { if (calibrateSizeByDeletes) { - int delCount = writer.numDeletedDocs(info); + int delCount = writer.numDeletesToMerge(info); assert delCount <= info.info.maxDoc(); return (info.info.maxDoc() - (long)delCount); } else { @@ -388,7 +388,7 @@ public abstract class LogMergePolicy extends MergePolicy { assert writer != null; for(int i=0;i 0) { if (verbose(writer)) { message(" segment " + info.info.name + " has deletions", writer); diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index c0d9748b6fd..8212c4a3f2c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -30,8 +30,10 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; +import org.apache.lucene.document.Field; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MergeInfo; +import org.apache.lucene.util.IOSupplier; /** *

Expert: a MergePolicy determines the sequence of @@ -310,7 +312,7 @@ public abstract class MergePolicy { * input total size. This is only set once the merge is * initialized by IndexWriter. */ - public long totalBytesSize() throws IOException { + public long totalBytesSize() { return totalMergeBytes; } @@ -318,7 +320,7 @@ public abstract class MergePolicy { * Returns the total number of documents that are included with this merge. * Note that this does not indicate the number of documents after the merge. * */ - public int totalNumDocs() throws IOException { + public int totalNumDocs() { int total = 0; for (SegmentCommitInfo info : segments) { total += info.info.maxDoc(); @@ -551,7 +553,7 @@ public abstract class MergePolicy { * non-deleted documents is set. */ protected long size(SegmentCommitInfo info, IndexWriter writer) throws IOException { long byteSize = info.sizeInBytes(); - int delCount = writer.numDeletedDocs(info); + int delCount = writer.numDeletesToMerge(info); double delRatio = info.info.maxDoc() <= 0 ? 0.0f : (float) delCount / (float) info.info.maxDoc(); assert delRatio <= 1.0; return (info.info.maxDoc() <= 0 ? byteSize : (long) (byteSize * (1.0 - delRatio))); @@ -562,7 +564,7 @@ public abstract class MergePolicy { * writer, and matches the current compound file setting */ protected final boolean isMerged(SegmentInfos infos, SegmentCommitInfo info, IndexWriter writer) throws IOException { assert writer != null; - boolean hasDeletions = writer.numDeletedDocs(info) > 0; + boolean hasDeletions = writer.numDeletesToMerge(info) > 0; return !hasDeletions && info.info.dir == writer.getDirectory() && useCompoundFile(infos, info, writer) == info.info.getUseCompoundFile(); @@ -612,4 +614,21 @@ public abstract class MergePolicy { public boolean keepFullyDeletedSegment(CodecReader reader) throws IOException { return false; } + + /** + * Returns the number of deletes that a merge would claim on the given segment. This method will by default return + * the sum of the del count on disk and the pending delete count. Yet, subclasses that wrap merge readers + * might modify this to reflect deletes that are carried over to the target segment in the case of soft deletes. + * + * Soft deletes all deletes to survive across merges in order to control when the soft-deleted data is claimed. + * @see IndexWriter#softUpdateDocument(Term, Iterable, Field...) + * @see IndexWriterConfig#setSoftDeletesField(String) + * @param info the segment info that identifies the segment + * @param pendingDeleteCount the number of pending deletes for this segment + * @param readerSupplier a supplier that allows to obtain a {@link CodecReader} for this segment + */ + public int numDeletesToMerge(SegmentCommitInfo info, int pendingDeleteCount, + IOSupplier readerSupplier) throws IOException { + return info.getDelCount() + pendingDeleteCount; + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicyWrapper.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicyWrapper.java index 606f3c2bd45..c7124add4a4 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicyWrapper.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicyWrapper.java @@ -19,6 +19,8 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.Map; +import org.apache.lucene.util.IOSupplier; + /** * A wrapper for {@link MergePolicy} instances. * @@ -90,4 +92,9 @@ public class MergePolicyWrapper extends MergePolicy { public boolean keepFullyDeletedSegment(CodecReader reader) throws IOException { return in.keepFullyDeletedSegment(reader); } + + @Override + public int numDeletesToMerge(SegmentCommitInfo info, int pendingDeleteCount, IOSupplier readerSupplier) throws IOException { + return in.numDeletesToMerge(info, pendingDeleteCount, readerSupplier); + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java index 4387f250513..08f900a32b8 100644 --- a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java @@ -20,6 +20,8 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.Map; +import org.apache.lucene.util.IOSupplier; + /** * A {@link MergePolicy} which never returns merges to execute. Use it if you * want to prevent segment merges. @@ -73,6 +75,11 @@ public final class NoMergePolicy extends MergePolicy { return super.keepFullyDeletedSegment(reader); } + @Override + public int numDeletesToMerge(SegmentCommitInfo info, int pendingDeleteCount, IOSupplier readerSupplier) throws IOException { + return super.numDeletesToMerge(info, pendingDeleteCount, readerSupplier); + } + @Override public String toString() { return "NoMergePolicy"; diff --git a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java index 3e06acad173..4851d45ccc4 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java @@ -41,6 +41,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOSupplier; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; @@ -87,6 +88,10 @@ final class ReadersAndUpdates { final AtomicLong ramBytesUsed = new AtomicLong(); + // if set to true the pending deletes must be marked as shared next time the reader is + // returned from #getReader() + private boolean liveDocsSharedPending = false; + ReadersAndUpdates(int indexCreatedVersionMajor, SegmentCommitInfo info, PendingDeletes pendingDeletes) { this.info = info; @@ -196,12 +201,15 @@ final class ReadersAndUpdates { // We steal returned ref: reader = new SegmentReader(info, indexCreatedVersionMajor, context); pendingDeletes.onNewReader(reader, info); + } else if (liveDocsSharedPending) { + markAsShared(); } + // Ref for caller reader.incRef(); return reader; } - + public synchronized void release(SegmentReader sr) throws IOException { assert info == sr.getSegmentInfo(); sr.decRef(); @@ -221,6 +229,7 @@ final class ReadersAndUpdates { } finally { reader = null; } + liveDocsSharedPending = false; } decRef(); @@ -237,7 +246,7 @@ final class ReadersAndUpdates { } // force new liveDocs Bits liveDocs = pendingDeletes.getLiveDocs(); - pendingDeletes.liveDocsShared(); + markAsShared(); if (liveDocs != null) { return new SegmentReader(reader.getSegmentInfo(), reader, liveDocs, info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes()); @@ -249,6 +258,22 @@ final class ReadersAndUpdates { } } + synchronized int numDeletesToMerge(MergePolicy policy) throws IOException { + IOSupplier readerSupplier = () -> { + if (this.reader == null) { + // get a reader and dec the ref right away we just make sure we have a reader + getReader(IOContext.READ).decRef(); + } + if (reader.getLiveDocs() != pendingDeletes.getLiveDocs() + || reader.numDeletedDocs() != info.getDelCount() - pendingDeletes.numPendingDeletes()) { + // we have a reader but its live-docs are out of sync. let's create a temporary one that we never share + swapNewReaderWithLatestLiveDocs(); + } + return reader; + }; + return policy.numDeletesToMerge(info, pendingDeletes.numPendingDeletes(), readerSupplier); + } + public synchronized Bits getLiveDocs() { return pendingDeletes.getLiveDocs(); @@ -676,18 +701,7 @@ final class ReadersAndUpdates { // if there is a reader open, reopen it to reflect the updates if (reader != null) { - SegmentReader newReader = new SegmentReader(info, reader, pendingDeletes.getLiveDocs(), info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes()); - boolean success2 = false; - try { - pendingDeletes.onNewReader(newReader, info); - reader.decRef(); - reader = newReader; - success2 = true; - } finally { - if (success2 == false) { - newReader.decRef(); - } - } + swapNewReaderWithLatestLiveDocs(); } // writing field updates succeeded @@ -713,6 +727,28 @@ final class ReadersAndUpdates { return true; } + private SegmentReader createNewReaderWithLatestLiveDocs(SegmentReader reader) throws IOException { + assert reader != null; + SegmentReader newReader = new SegmentReader(info, reader, pendingDeletes.getLiveDocs(), + info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes()); + boolean success2 = false; + try { + pendingDeletes.onNewReader(newReader, info); + reader.decRef(); + success2 = true; + } finally { + if (success2 == false) { + newReader.decRef(); + } + } + return newReader; + } + + private void swapNewReaderWithLatestLiveDocs() throws IOException { + reader = createNewReaderWithLatestLiveDocs(reader); + liveDocsSharedPending = true; + } + synchronized public void setIsMerging() { // This ensures any newly resolved doc value updates while we are merging are // saved for re-applying after this segment is done merging: @@ -743,26 +779,11 @@ final class ReadersAndUpdates { if (delCount != reader.numDeletedDocs()) { // beware of zombies: assert delCount > reader.numDeletedDocs(): "delCount=" + delCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs(); - Bits liveDocs = pendingDeletes.getLiveDocs(); - assert liveDocs != null; - - // Create a new reader with the latest live docs: - SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.maxDoc() - delCount); - boolean success = false; - try { - reader.decRef(); - pendingDeletes.onNewReader(newReader, info); - success = true; - } finally { - if (success == false) { - newReader.close(); - } - } - reader = newReader; + assert pendingDeletes.getLiveDocs() != null; + reader = createNewReaderWithLatestLiveDocs(reader); } - pendingDeletes.liveDocsShared(); - + markAsShared(); assert verifyDocCounts(); return reader; @@ -794,5 +815,11 @@ final class ReadersAndUpdates { public synchronized boolean isFullyDeleted() { return pendingDeletes.isFullyDeleted(); } + + private final void markAsShared() { + assert Thread.holdsLock(this); + liveDocsSharedPending = false; + pendingDeletes.liveDocsShared(); // this is not costly we can just call it even if it's already marked as shared + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/SoftDeletesRetentionMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/SoftDeletesRetentionMergePolicy.java index b0887557bed..51578f98729 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SoftDeletesRetentionMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/SoftDeletesRetentionMergePolicy.java @@ -33,6 +33,7 @@ import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; import org.apache.lucene.util.Bits; import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOSupplier; /** * This {@link MergePolicy} allows to carry over soft deleted documents across merges. The policy wraps @@ -167,4 +168,30 @@ public final class SoftDeletesRetentionMergePolicy extends OneMergeWrappingMerge return numDocs; } }; - }} + } + + @Override + public int numDeletesToMerge(SegmentCommitInfo info, int pendingDeleteCount, IOSupplier readerSupplier) throws IOException { + int numDeletesToMerge = super.numDeletesToMerge(info, pendingDeleteCount, readerSupplier); + if (numDeletesToMerge != 0) { + final CodecReader reader = readerSupplier.get(); + if (reader.getLiveDocs() != null) { + Scorer scorer = getScorer(field, retentionQuerySupplier.get(), wrapLiveDocs(reader, null, reader.maxDoc())); + if (scorer != null) { + DocIdSetIterator iterator = scorer.iterator(); + Bits liveDocs = reader.getLiveDocs(); + int numDeletedDocs = reader.numDeletedDocs(); + while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (liveDocs.get(iterator.docID()) == false) { + numDeletedDocs--; + } + } + return numDeletedDocs; + } + } + } + assert numDeletesToMerge >= 0 : "numDeletesToMerge: " + numDeletesToMerge; + assert numDeletesToMerge <= info.info.maxDoc() : "numDeletesToMerge: " + numDeletesToMerge + " maxDoc:" + info.info.maxDoc(); + return numDeletesToMerge; + } +} diff --git a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java index cdfdea6e5af..bdd3a8a6ac4 100644 --- a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java @@ -597,7 +597,7 @@ public class TieredMergePolicy extends MergePolicy { final List eligible = new ArrayList<>(); final Set merging = writer.getMergingSegments(); for(SegmentCommitInfo info : infos) { - double pctDeletes = 100.*((double) writer.numDeletedDocs(info))/info.info.maxDoc(); + double pctDeletes = 100.*((double) writer.numDeletesToMerge(info))/info.info.maxDoc(); if (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info)) { eligible.add(info); } diff --git a/lucene/core/src/java/org/apache/lucene/util/IOSupplier.java b/lucene/core/src/java/org/apache/lucene/util/IOSupplier.java new file mode 100644 index 00000000000..04758278665 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/util/IOSupplier.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.util; + +import java.io.IOException; + +/** + * This is a result supplier that is allowed to throw an IOException. + * + * @see java.util.function.Supplier + * @param the suppliers result type. + */ +@FunctionalInterface +public interface IOSupplier{ + + /** + * Gets the result. + * @return the result + * @throws IOException if producing the result throws an {@link IOException} + */ + T get() throws IOException; +} diff --git a/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java index 3d8ffe3b158..061d006f249 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -37,6 +38,7 @@ import org.apache.lucene.document.StringField; import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; @@ -47,6 +49,60 @@ import org.apache.lucene.util.LuceneTestCase; public class TestSoftDeletesRetentionMergePolicy extends LuceneTestCase { + public void testForceMergeFullyDeleted() throws IOException { + Directory dir = newDirectory(); + AtomicBoolean letItGo = new AtomicBoolean(false); + MergePolicy policy = new SoftDeletesRetentionMergePolicy("soft_delete", + () -> letItGo.get() ? new MatchNoDocsQuery() : new MatchAllDocsQuery(), new LogDocMergePolicy()); + IndexWriterConfig indexWriterConfig = newIndexWriterConfig().setMergePolicy(policy) + .setSoftDeletesField("soft_delete"); + IndexWriter writer = new IndexWriter(dir, indexWriterConfig); + + Document doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new NumericDocValuesField("soft_delete", 1)); + writer.addDocument(doc); + writer.commit(); + doc = new Document(); + doc.add(new StringField("id", "2", Field.Store.YES)); + doc.add(new NumericDocValuesField("soft_delete", 1)); + writer.addDocument(doc); + DirectoryReader reader = writer.getReader(); + { + assertEquals(2, reader.leaves().size()); + final SegmentReader segmentReader = (SegmentReader) reader.leaves().get(0).reader(); + assertTrue(policy.keepFullyDeletedSegment(segmentReader)); + assertEquals(0, policy.numDeletesToMerge(segmentReader.getSegmentInfo(), 0, () -> segmentReader)); + } + { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(1).reader(); + assertTrue(policy.keepFullyDeletedSegment(segmentReader)); + assertEquals(0, policy.numDeletesToMerge(segmentReader.getSegmentInfo(), 0, () -> segmentReader)); + writer.forceMerge(1); + reader.close(); + } + reader = writer.getReader(); + { + assertEquals(1, reader.leaves().size()); + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(0).reader(); + assertEquals(2, reader.maxDoc()); + assertTrue(policy.keepFullyDeletedSegment(segmentReader)); + assertEquals(0, policy.numDeletesToMerge(segmentReader.getSegmentInfo(), 0, () -> segmentReader)); + } + writer.forceMerge(1); // make sure we don't merge this + assertNull(DirectoryReader.openIfChanged(reader)); + + writer.forceMergeDeletes(); // make sure we don't merge this + assertNull(DirectoryReader.openIfChanged(reader)); + letItGo.set(true); + writer.forceMergeDeletes(); // make sure we don't merge this + DirectoryReader directoryReader = DirectoryReader.openIfChanged(reader); + assertNotNull(directoryReader); + assertEquals(0, directoryReader.numDeletedDocs()); + assertEquals(0, directoryReader.maxDoc()); + IOUtils.close(directoryReader, reader, writer, dir); + } + public void testKeepFullyDeletedSegments() throws IOException { Directory dir = newDirectory(); IndexWriterConfig indexWriterConfig = newIndexWriterConfig();