LUCENE-10237 : Add MergeOnCommitTieredMergePolicy to sandbox (#446)

This commit is contained in:
Anand 2022-03-02 13:49:25 +05:30 committed by GitHub
parent 1b083ea039
commit 14726dec51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 222 additions and 0 deletions

View File

@ -41,6 +41,8 @@ Bug Fixes
Other
---------------------
* LUCENE-10237: Add MergeOnFlushMergePolicy to sandbox.
(Michael Froh, Anand Kotriwal)
* LUCENE-10283: The minimum required Java version was bumped from 11 to 17.
(Adrien Grand, Uwe Schindler, Dawid Weiss, Robert Muir)

View File

@ -25,6 +25,7 @@ module org.apache.lucene.sandbox {
exports org.apache.lucene.sandbox.document;
exports org.apache.lucene.sandbox.queries;
exports org.apache.lucene.sandbox.search;
exports org.apache.lucene.sandbox.index;
provides org.apache.lucene.codecs.PostingsFormat with
org.apache.lucene.sandbox.codecs.idversion.IDVersionPostingsFormat;

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.sandbox.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.index.*;
/**
* A simple extension to wrap {@link MergePolicy} to merge all tiny segments (or at least segments
* smaller than specified in {@link #setSmallSegmentThresholdMB(double)} into one segment on commit.
*/
public class MergeOnFlushMergePolicy extends FilterMergePolicy {
private long smallSegmentThresholdBytes = Units.mbToBytes(100.0);
/**
* Creates a MergeOnFlushMergePolicy merge policy instance wrapping another.
*
* @param mergePolicy the wrapped {@link MergePolicy}
*/
public MergeOnFlushMergePolicy(MergePolicy mergePolicy) {
super(mergePolicy);
}
public double getSmallSegmentThresholdMB() {
return Units.bytesToMB(smallSegmentThresholdBytes);
}
/**
* @param smallSegmentThresholdMB all segments smaller than this will be merged into a single
* segment before commit completes.
*/
public void setSmallSegmentThresholdMB(double smallSegmentThresholdMB) {
this.smallSegmentThresholdBytes = Units.mbToBytes(smallSegmentThresholdMB);
}
@Override
public MergeSpecification findFullFlushMerges(
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
List<SegmentCommitInfo> smallSegments = new ArrayList<>();
for (SegmentCommitInfo sci : segmentInfos) {
if (sci.sizeInBytes() < smallSegmentThresholdBytes) {
if (mergeContext.getMergingSegments().contains(sci) == false) {
smallSegments.add(sci);
}
}
}
if (smallSegments.size() > 1) {
MergeSpecification mergeSpecification = new MergeSpecification();
mergeSpecification.add(new OneMerge(smallSegments));
return mergeSpecification;
}
return null;
}
/** Utility class to handle conversion between megabytes and bytes */
static class Units {
private Units() {}
public static double bytesToMB(long bytes) {
return bytes / 1024. / 1024.;
}
public static long mbToBytes(double megabytes) {
return (long) (megabytes * 1024 * 1024);
}
}
}

View File

@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Experimental index-related classes */
package org.apache.lucene.sandbox.index;

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.sandbox.index;
import java.io.IOException;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.tests.index.BaseMergePolicyTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.Version;
/** Test for {@link MergeOnFlushMergePolicy}. */
public class TestMergeOnFlushMergePolicy extends BaseMergePolicyTestCase {
@Override
protected MergePolicy mergePolicy() {
Random r = random();
MergePolicy mergePolicy = newMergePolicy();
MergeOnFlushMergePolicy mergeOnFlushPolicy = new MergeOnFlushMergePolicy(mergePolicy);
mergeOnFlushPolicy.setMaxCFSSegmentSizeMB(mergePolicy.getMaxCFSSegmentSizeMB());
mergeOnFlushPolicy.setNoCFSRatio(mergePolicy.getNoCFSRatio());
mergeOnFlushPolicy.setSmallSegmentThresholdMB(TestUtil.nextInt(r, 1, 100));
if (mergePolicy instanceof TieredMergePolicy) {
((TieredMergePolicy) mergePolicy)
.setMaxMergedSegmentMB(TestUtil.nextInt(random(), 1024, 10 * 1024));
}
return mergeOnFlushPolicy;
}
@Override
protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) {}
@Override
protected void assertMerge(MergePolicy policy, MergePolicy.MergeSpecification merge) {}
public void testFindFullFlushMerges() throws IOException {
MergeOnFlushMergePolicy mergePolicy = (MergeOnFlushMergePolicy) mergePolicy();
double smallSegmentThresholdMB = mergePolicy.getSmallSegmentThresholdMB();
Random r = random();
for (int j = 0; j < 10_000; j++) {
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
int numSegs = random().nextInt(50);
Set<SegmentCommitInfo> mergingSegments = new HashSet<>();
Set<SegmentCommitInfo> smallSegments = new HashSet<>();
for (int i = 0; i < numSegs; i++) {
SegmentCommitInfo sci =
makeSegmentCommitInfo(
"_" + i,
TestUtil.nextInt(r, 10, 100),
r.nextInt(10),
r.nextDouble() * 2.0 * smallSegmentThresholdMB,
IndexWriter.SOURCE_FLUSH);
if (sci.sizeInBytes() < MergeOnFlushMergePolicy.Units.mbToBytes(smallSegmentThresholdMB)) {
smallSegments.add(sci);
}
if (r.nextBoolean()) {
mergingSegments.add(sci);
}
segmentInfos.add(sci);
}
MockMergeContext context = new MockMergeContext(SegmentCommitInfo::getDelCount);
context.setMergingSegments(mergingSegments);
MergePolicy.MergeSpecification mergeSpecification;
mergeSpecification =
mergePolicy.findFullFlushMerges(MergeTrigger.COMMIT, segmentInfos, context);
if (mergeSpecification == null) {
// If we didn't compute a merge, then we have at most one small segment not already
// participating in a merge.
boolean foundNonMergingSmallSegment = false;
for (SegmentCommitInfo smallSegment : smallSegments) {
if (!mergingSegments.contains(smallSegment)) {
assertFalse(
"If no merges, then at most one small segment is not already merging",
foundNonMergingSmallSegment);
foundNonMergingSmallSegment = true;
}
}
} else {
for (MergePolicy.OneMerge oneMerge : mergeSpecification.merges) {
for (SegmentCommitInfo sci : oneMerge.segments) {
assertTrue("Merges only contain small segments", smallSegments.contains(sci));
assertFalse(
"findFullFlushMerges must not return already merging segments",
mergingSegments.contains(sci));
}
}
}
}
}
}