Enable merge on refresh and merge on commit on Opensearch (#2535)

Enables merge on refresh and merge on commit in Opensearch by 
way of two new index options: 
index.merge_on_flush.max_full_flush_merge_wait_time and 
index.merge_on_flush.enabled. Default merge_on_flush is disabled and
wait time is 10s. 

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
This commit is contained in:
Andriy Redko 2022-03-25 14:45:34 -04:00 committed by GitHub
parent cc0e66b1dc
commit 908682d437
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 274 additions and 0 deletions

View File

@ -187,6 +187,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.FINAL_PIPELINE,
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {

View File

@ -503,6 +503,27 @@ public final class IndexSettings {
Setting.Property.IndexScope
);
/**
* Expert: sets the amount of time to wait for merges (during {@link org.apache.lucene.index.IndexWriter#commit}
* or {@link org.apache.lucene.index.IndexWriter#getReader(boolean, boolean)}) returned by MergePolicy.findFullFlushMerges(...).
* If this time is reached, we proceed with the commit based on segments merged up to that point. The merges are not
* aborted, and will still run to completion independent of the commit or getReader call, like natural segment merges.
*/
public static final Setting<TimeValue> INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME = Setting.timeSetting(
"index.merge_on_flush.max_full_flush_merge_wait_time",
new TimeValue(10, TimeUnit.SECONDS),
new TimeValue(0, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope
);
public static final Setting<Boolean> INDEX_MERGE_ON_FLUSH_ENABLED = Setting.boolSetting(
"index.merge_on_flush.enabled",
false,
Property.IndexScope,
Property.Dynamic
);
private final Index index;
private final Version version;
private final Logger logger;
@ -584,6 +605,15 @@ public final class IndexSettings {
*/
private volatile int maxRegexLength;
/**
* The max amount of time to wait for merges
*/
private volatile TimeValue maxFullFlushMergeWaitTime;
/**
* Is merge of flush enabled or not
*/
private volatile boolean mergeOnFlushEnabled;
/**
* Returns the default search fields for this index.
*/
@ -696,6 +726,8 @@ public final class IndexSettings {
mappingTotalFieldsLimit = scopedSettings.get(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING);
mappingDepthLimit = scopedSettings.get(INDEX_MAPPING_DEPTH_LIMIT_SETTING);
mappingFieldNameLengthLimit = scopedSettings.get(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING);
maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME);
mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED);
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(
@ -765,6 +797,8 @@ public final class IndexSettings {
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING, this::setMappingTotalFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DEPTH_LIMIT_SETTING, this::setMappingDepthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled);
}
private void setSearchIdleAfter(TimeValue searchIdleAfter) {
@ -1328,4 +1362,20 @@ public final class IndexSettings {
private void setMappingFieldNameLengthLimit(long value) {
this.mappingFieldNameLengthLimit = value;
}
private void setMaxFullFlushMergeWaitTime(TimeValue timeValue) {
this.maxFullFlushMergeWaitTime = timeValue;
}
private void setMergeOnFlushEnabled(boolean enabled) {
this.mergeOnFlushEnabled = enabled;
}
public TimeValue getMaxFullFlushMergeWaitTime() {
return this.maxFullFlushMergeWaitTime;
}
public boolean isMergeOnFlushEnabled() {
return mergeOnFlushEnabled;
}
}

View File

@ -50,6 +50,7 @@ import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
@ -2425,6 +2426,21 @@ public class InternalEngine extends Engine {
// to enable it.
mergePolicy = new ShuffleForcedMergePolicy(mergePolicy);
}
if (config().getIndexSettings().isMergeOnFlushEnabled()) {
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
if (maxFullFlushMergeWaitMillis > 0) {
iwc.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
mergePolicy = new MergeOnFlushMergePolicy(mergePolicy);
} else {
logger.warn(
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()
);
}
}
iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());

View File

@ -494,6 +494,212 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public void testMergeSegmentsOnCommitIsDisabled() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(0))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
)
) {
assertThat(engine.segments(false), empty());
int numDocsFirstSegment = randomIntBetween(5, 50);
Set<String> liveDocsFirstSegment = new HashSet<>();
for (int i = 0; i < numDocsFirstSegment; i++) {
String id = Integer.toString(i);
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocsFirstSegment.add(id);
}
engine.refresh("test");
List<Segment> segments = engine.segments(randomBoolean());
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertFalse(segments.get(0).committed);
int deletes = 0;
int updates = 0;
int appends = 0;
int iterations = scaledRandomIntBetween(1, 50);
for (int i = 0; i < iterations && liveDocsFirstSegment.isEmpty() == false; i++) {
String idToUpdate = randomFrom(liveDocsFirstSegment);
liveDocsFirstSegment.remove(idToUpdate);
ParsedDocument doc = testParsedDocument(idToUpdate, null, testDocument(), B_1, null);
if (randomBoolean()) {
engine.delete(new Engine.Delete(doc.id(), newUid(doc), primaryTerm.get()));
deletes++;
} else {
engine.index(indexForDoc(doc));
updates++;
}
if (randomBoolean()) {
engine.index(indexForDoc(testParsedDocument(UUIDs.randomBase64UUID(), null, testDocument(), B_1, null)));
appends++;
}
}
boolean committed = randomBoolean();
if (committed) {
engine.flush();
}
engine.refresh("test");
segments = engine.segments(randomBoolean());
assertThat(segments, hasSize(2));
assertThat(segments, hasSize(2));
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
assertThat(segments.get(0).getDeletedDocs(), equalTo(updates + deletes));
assertThat(segments.get(0).committed, equalTo(committed));
assertThat(segments.get(1).getNumDocs(), equalTo(updates + appends));
assertThat(segments.get(1).getDeletedDocs(), equalTo(deletes)); // delete tombstones
assertThat(segments.get(1).committed, equalTo(committed));
}
}
public void testMergeSegmentsOnCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
)
) {
assertThat(engine.segments(false), empty());
int numDocsFirstSegment = randomIntBetween(5, 50);
Set<String> liveDocsFirstSegment = new HashSet<>();
for (int i = 0; i < numDocsFirstSegment; i++) {
String id = Integer.toString(i);
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocsFirstSegment.add(id);
}
engine.refresh("test");
List<Segment> segments = engine.segments(randomBoolean());
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertFalse(segments.get(0).committed);
int deletes = 0;
int updates = 0;
int appends = 0;
int iterations = scaledRandomIntBetween(1, 50);
for (int i = 0; i < iterations && liveDocsFirstSegment.isEmpty() == false; i++) {
String idToUpdate = randomFrom(liveDocsFirstSegment);
liveDocsFirstSegment.remove(idToUpdate);
ParsedDocument doc = testParsedDocument(idToUpdate, null, testDocument(), B_1, null);
if (randomBoolean()) {
engine.delete(new Engine.Delete(doc.id(), newUid(doc), primaryTerm.get()));
deletes++;
} else {
engine.index(indexForDoc(doc));
updates++;
}
if (randomBoolean()) {
engine.index(indexForDoc(testParsedDocument(UUIDs.randomBase64UUID(), null, testDocument(), B_1, null)));
appends++;
}
}
boolean committed = randomBoolean();
if (committed) {
engine.flush();
}
engine.refresh("test");
segments = engine.segments(randomBoolean());
// All segments have to be merged into one
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(numDocsFirstSegment + appends - deletes));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).committed, equalTo(committed));
}
}
// this test writes documents to the engine while concurrently flushing/commit
public void testConcurrentMergeSegmentsOnCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
)
) {
final int numIndexingThreads = scaledRandomIntBetween(3, 8);
final int numDocsPerThread = randomIntBetween(500, 1000);
final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1);
final List<Thread> indexingThreads = new ArrayList<>();
final CountDownLatch doneLatch = new CountDownLatch(numIndexingThreads);
// create N indexing threads to index documents simultaneously
for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) {
final int threadIdx = threadNum;
Thread indexingThread = new Thread(() -> {
try {
barrier.await(); // wait for all threads to start at the same time
// index random number of docs
for (int i = 0; i < numDocsPerThread; i++) {
final String id = "thread" + threadIdx + "#" + i;
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
doneLatch.countDown();
}
});
indexingThreads.add(indexingThread);
}
// start the indexing threads
for (Thread thread : indexingThreads) {
thread.start();
}
barrier.await(); // wait for indexing threads to all be ready to start
assertThat(doneLatch.await(10, TimeUnit.SECONDS), is(true));
boolean committed = randomBoolean();
if (committed) {
engine.flush();
}
engine.refresh("test");
List<Segment> segments = engine.segments(randomBoolean());
// All segments have to be merged into one
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(numIndexingThreads * numDocsPerThread));
assertThat(segments.get(0).committed, equalTo(committed));
}
}
public void testCommitStats() throws IOException {
final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);