From 14726dec5168ccaf14065e14e7d7f3ee22f186f0 Mon Sep 17 00:00:00 2001 From: Anand Date: Wed, 2 Mar 2022 13:49:25 +0530 Subject: [PATCH] LUCENE-10237 : Add MergeOnCommitTieredMergePolicy to sandbox (#446) --- lucene/CHANGES.txt | 2 + lucene/sandbox/src/java/module-info.java | 1 + .../index/MergeOnFlushMergePolicy.java | 85 +++++++++++++ .../lucene/sandbox/index/package-info.java | 19 +++ .../index/TestMergeOnFlushMergePolicy.java | 115 ++++++++++++++++++ 5 files changed, 222 insertions(+) create mode 100644 lucene/sandbox/src/java/org/apache/lucene/sandbox/index/MergeOnFlushMergePolicy.java create mode 100644 lucene/sandbox/src/java/org/apache/lucene/sandbox/index/package-info.java create mode 100644 lucene/sandbox/src/test/org/apache/lucene/sandbox/index/TestMergeOnFlushMergePolicy.java diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index cc41cbb2118..f25a8b49ccd 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -41,6 +41,8 @@ Bug Fixes Other --------------------- +* LUCENE-10237: Add MergeOnFlushMergePolicy to sandbox. + (Michael Froh, Anand Kotriwal) * LUCENE-10283: The minimum required Java version was bumped from 11 to 17. (Adrien Grand, Uwe Schindler, Dawid Weiss, Robert Muir) diff --git a/lucene/sandbox/src/java/module-info.java b/lucene/sandbox/src/java/module-info.java index 6d195445fbe..c51a25691ef 100644 --- a/lucene/sandbox/src/java/module-info.java +++ b/lucene/sandbox/src/java/module-info.java @@ -25,6 +25,7 @@ module org.apache.lucene.sandbox { exports org.apache.lucene.sandbox.document; exports org.apache.lucene.sandbox.queries; exports org.apache.lucene.sandbox.search; + exports org.apache.lucene.sandbox.index; provides org.apache.lucene.codecs.PostingsFormat with org.apache.lucene.sandbox.codecs.idversion.IDVersionPostingsFormat; diff --git a/lucene/sandbox/src/java/org/apache/lucene/sandbox/index/MergeOnFlushMergePolicy.java b/lucene/sandbox/src/java/org/apache/lucene/sandbox/index/MergeOnFlushMergePolicy.java new file mode 100644 index 00000000000..53e4d9941e4 --- /dev/null +++ b/lucene/sandbox/src/java/org/apache/lucene/sandbox/index/MergeOnFlushMergePolicy.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.sandbox.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.lucene.index.*; + +/** + * A simple extension to wrap {@link MergePolicy} to merge all tiny segments (or at least segments + * smaller than specified in {@link #setSmallSegmentThresholdMB(double)} into one segment on commit. + */ +public class MergeOnFlushMergePolicy extends FilterMergePolicy { + private long smallSegmentThresholdBytes = Units.mbToBytes(100.0); + + /** + * Creates a MergeOnFlushMergePolicy merge policy instance wrapping another. + * + * @param mergePolicy the wrapped {@link MergePolicy} + */ + public MergeOnFlushMergePolicy(MergePolicy mergePolicy) { + super(mergePolicy); + } + + public double getSmallSegmentThresholdMB() { + return Units.bytesToMB(smallSegmentThresholdBytes); + } + + /** + * @param smallSegmentThresholdMB all segments smaller than this will be merged into a single + * segment before commit completes. + */ + public void setSmallSegmentThresholdMB(double smallSegmentThresholdMB) { + this.smallSegmentThresholdBytes = Units.mbToBytes(smallSegmentThresholdMB); + } + + @Override + public MergeSpecification findFullFlushMerges( + MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) + throws IOException { + List smallSegments = new ArrayList<>(); + for (SegmentCommitInfo sci : segmentInfos) { + if (sci.sizeInBytes() < smallSegmentThresholdBytes) { + if (mergeContext.getMergingSegments().contains(sci) == false) { + smallSegments.add(sci); + } + } + } + if (smallSegments.size() > 1) { + MergeSpecification mergeSpecification = new MergeSpecification(); + mergeSpecification.add(new OneMerge(smallSegments)); + return mergeSpecification; + } + return null; + } + /** Utility class to handle conversion between megabytes and bytes */ + static class Units { + + private Units() {} + + public static double bytesToMB(long bytes) { + return bytes / 1024. / 1024.; + } + + public static long mbToBytes(double megabytes) { + return (long) (megabytes * 1024 * 1024); + } + } +} diff --git a/lucene/sandbox/src/java/org/apache/lucene/sandbox/index/package-info.java b/lucene/sandbox/src/java/org/apache/lucene/sandbox/index/package-info.java new file mode 100644 index 00000000000..186ae0b0097 --- /dev/null +++ b/lucene/sandbox/src/java/org/apache/lucene/sandbox/index/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Experimental index-related classes */ +package org.apache.lucene.sandbox.index; diff --git a/lucene/sandbox/src/test/org/apache/lucene/sandbox/index/TestMergeOnFlushMergePolicy.java b/lucene/sandbox/src/test/org/apache/lucene/sandbox/index/TestMergeOnFlushMergePolicy.java new file mode 100644 index 00000000000..12ba8d57585 --- /dev/null +++ b/lucene/sandbox/src/test/org/apache/lucene/sandbox/index/TestMergeOnFlushMergePolicy.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.sandbox.index; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.MergeTrigger; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.TieredMergePolicy; +import org.apache.lucene.tests.index.BaseMergePolicyTestCase; +import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.util.Version; + +/** Test for {@link MergeOnFlushMergePolicy}. */ +public class TestMergeOnFlushMergePolicy extends BaseMergePolicyTestCase { + + @Override + protected MergePolicy mergePolicy() { + Random r = random(); + MergePolicy mergePolicy = newMergePolicy(); + MergeOnFlushMergePolicy mergeOnFlushPolicy = new MergeOnFlushMergePolicy(mergePolicy); + mergeOnFlushPolicy.setMaxCFSSegmentSizeMB(mergePolicy.getMaxCFSSegmentSizeMB()); + mergeOnFlushPolicy.setNoCFSRatio(mergePolicy.getNoCFSRatio()); + mergeOnFlushPolicy.setSmallSegmentThresholdMB(TestUtil.nextInt(r, 1, 100)); + if (mergePolicy instanceof TieredMergePolicy) { + ((TieredMergePolicy) mergePolicy) + .setMaxMergedSegmentMB(TestUtil.nextInt(random(), 1024, 10 * 1024)); + } + return mergeOnFlushPolicy; + } + + @Override + protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) {} + + @Override + protected void assertMerge(MergePolicy policy, MergePolicy.MergeSpecification merge) {} + + public void testFindFullFlushMerges() throws IOException { + MergeOnFlushMergePolicy mergePolicy = (MergeOnFlushMergePolicy) mergePolicy(); + double smallSegmentThresholdMB = mergePolicy.getSmallSegmentThresholdMB(); + Random r = random(); + + for (int j = 0; j < 10_000; j++) { + SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major); + int numSegs = random().nextInt(50); + Set mergingSegments = new HashSet<>(); + Set smallSegments = new HashSet<>(); + for (int i = 0; i < numSegs; i++) { + SegmentCommitInfo sci = + makeSegmentCommitInfo( + "_" + i, + TestUtil.nextInt(r, 10, 100), + r.nextInt(10), + r.nextDouble() * 2.0 * smallSegmentThresholdMB, + IndexWriter.SOURCE_FLUSH); + if (sci.sizeInBytes() < MergeOnFlushMergePolicy.Units.mbToBytes(smallSegmentThresholdMB)) { + smallSegments.add(sci); + } + if (r.nextBoolean()) { + mergingSegments.add(sci); + } + segmentInfos.add(sci); + } + MockMergeContext context = new MockMergeContext(SegmentCommitInfo::getDelCount); + context.setMergingSegments(mergingSegments); + MergePolicy.MergeSpecification mergeSpecification; + + mergeSpecification = + mergePolicy.findFullFlushMerges(MergeTrigger.COMMIT, segmentInfos, context); + + if (mergeSpecification == null) { + // If we didn't compute a merge, then we have at most one small segment not already + // participating in a merge. + boolean foundNonMergingSmallSegment = false; + for (SegmentCommitInfo smallSegment : smallSegments) { + if (!mergingSegments.contains(smallSegment)) { + assertFalse( + "If no merges, then at most one small segment is not already merging", + foundNonMergingSmallSegment); + foundNonMergingSmallSegment = true; + } + } + } else { + for (MergePolicy.OneMerge oneMerge : mergeSpecification.merges) { + for (SegmentCommitInfo sci : oneMerge.segments) { + assertTrue("Merges only contain small segments", smallSegments.contains(sci)); + assertFalse( + "findFullFlushMerges must not return already merging segments", + mergingSegments.contains(sci)); + } + } + } + } + } +}