Ensure pending merges are updated on segment flushes

Due to the default of `async_merge` to `true` we never run
the merge policy on a segment flush which prevented the
pending merges from being updated and that caused actual
pending merges not to contribute to the merge decision.

This commit also removes the `index.async.merge` setting is actually
misleading since we take care of merges not being excecuted on the
indexing threads  on a different level (the merge scheduler) since 1.1.

This commit also adds an additional check when to run a refresh
since soely relying on the dirty flag might leave merges un-refreshed
which can cause search slowdowns and higher memory consumption.

Closes #5779
This commit is contained in:
Simon Willnauer 2014-04-11 15:40:07 +02:00
parent e0fbd5df52
commit 5d611a9098
7 changed files with 129 additions and 96 deletions

View File

@ -674,7 +674,15 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
@Override
public boolean refreshNeeded() {
return dirty;
try {
// we are either dirty due to a document added or due to a
// finished merge - either way we should refresh
return dirty || !searcherManager.isSearcherCurrent();
} catch (IOException e) {
logger.error("failed to access searcher manager", e);
failEngine(e);
throw new EngineException(shardId, "failed to access searcher manager",e);
}
}
@Override
@ -706,7 +714,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
// maybeRefresh will only allow one refresh to execute, and the rest will "pass through",
// but, we want to make sure not to loose ant refresh calls, if one is taking time
synchronized (refreshMutex) {
if (dirty || refresh.force()) {
if (refreshNeeded() || refresh.force()) {
// we set dirty to false, even though the refresh hasn't happened yet
// as the refresh only holds for data indexed before it. Any data indexed during
// the refresh will not be part of it and will set the dirty flag back to true
@ -926,7 +934,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
@Override
public void maybeMerge() throws EngineException {
if (!possibleMergeNeeded) {
if (!possibleMergeNeeded()) {
return;
}
possibleMergeNeeded = false;

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.merge.policy;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.inject.Inject;
@ -31,7 +30,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@ -41,13 +39,14 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class LogByteSizeMergePolicyProvider extends AbstractMergePolicyProvider<LogByteSizeMergePolicy> {
private final IndexSettingsService indexSettingsService;
public static final String MAX_MERGE_BYTE_SIZE_KEY = "index.merge.policy.max_merge_sizes";
public static final String MIN_MERGE_BYTE_SIZE_KEY = "index.merge.policy.min_merge_size";
public static final String MERGE_FACTORY_KEY = "index.merge.policy.merge_factor";
private volatile ByteSizeValue minMergeSize;
private volatile ByteSizeValue maxMergeSize;
private volatile int mergeFactor;
private volatile int maxMergeDocs;
private final boolean calibrateSizeByDeletes;
private boolean asyncMerge;
private final Set<CustomLogByteSizeMergePolicy> policies = new CopyOnWriteArraySet<>();
@ -63,21 +62,15 @@ public class LogByteSizeMergePolicyProvider extends AbstractMergePolicyProvider<
this.mergeFactor = componentSettings.getAsInt("merge_factor", LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR);
this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS);
this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true);
this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true);
logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]",
mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes, asyncMerge);
logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]",
mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes);
indexSettingsService.addListener(applySettings);
}
@Override
public LogByteSizeMergePolicy newMergePolicy() {
CustomLogByteSizeMergePolicy mergePolicy;
if (asyncMerge) {
mergePolicy = new EnableMergeLogByteSizeMergePolicy(this);
} else {
mergePolicy = new CustomLogByteSizeMergePolicy(this);
}
final CustomLogByteSizeMergePolicy mergePolicy = new CustomLogByteSizeMergePolicy(this);
mergePolicy.setMinMergeMB(minMergeSize.mbFrac());
mergePolicy.setMaxMergeMB(maxMergeSize.mbFrac());
mergePolicy.setMergeFactor(mergeFactor);
@ -173,19 +166,4 @@ public class LogByteSizeMergePolicyProvider extends AbstractMergePolicyProvider<
}
}
public static class EnableMergeLogByteSizeMergePolicy extends CustomLogByteSizeMergePolicy {
public EnableMergeLogByteSizeMergePolicy(LogByteSizeMergePolicyProvider provider) {
super(provider);
}
@Override
public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
// we don't enable merges while indexing documents, we do them in the background
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
return null;
}
return super.findMerges(trigger, infos);
}
}
}

View File

@ -20,8 +20,6 @@
package org.elasticsearch.index.merge.policy;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.inject.Inject;
@ -29,7 +27,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@ -39,12 +36,13 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class LogDocMergePolicyProvider extends AbstractMergePolicyProvider<LogDocMergePolicy> {
private final IndexSettingsService indexSettingsService;
public static final String MAX_MERGE_DOCS_KEY = "index.merge.policy.max_merge_docs";
public static final String MIN_MERGE_DOCS_KEY = "index.merge.policy.min_merge_docs";
public static final String MERGE_FACTORY_KEY = "index.merge.policy.merge_factor";
private volatile int minMergeDocs;
private volatile int maxMergeDocs;
private volatile int mergeFactor;
private final boolean calibrateSizeByDeletes;
private boolean asyncMerge;
private final Set<CustomLogDocMergePolicy> policies = new CopyOnWriteArraySet<>();
@ -60,9 +58,8 @@ public class LogDocMergePolicyProvider extends AbstractMergePolicyProvider<LogDo
this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS);
this.mergeFactor = componentSettings.getAsInt("merge_factor", LogDocMergePolicy.DEFAULT_MERGE_FACTOR);
this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true);
this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true);
logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]",
mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes, asyncMerge);
logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]",
mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes);
indexSettingsService.addListener(applySettings);
}
@ -74,12 +71,7 @@ public class LogDocMergePolicyProvider extends AbstractMergePolicyProvider<LogDo
@Override
public LogDocMergePolicy newMergePolicy() {
CustomLogDocMergePolicy mergePolicy;
if (asyncMerge) {
mergePolicy = new EnableMergeLogDocMergePolicy(this);
} else {
mergePolicy = new CustomLogDocMergePolicy(this);
}
final CustomLogDocMergePolicy mergePolicy = new CustomLogDocMergePolicy(this);
mergePolicy.setMinMergeDocs(minMergeDocs);
mergePolicy.setMaxMergeDocs(maxMergeDocs);
mergePolicy.setMergeFactor(mergeFactor);
@ -150,27 +142,4 @@ public class LogDocMergePolicyProvider extends AbstractMergePolicyProvider<LogDo
provider.policies.remove(this);
}
}
public static class EnableMergeLogDocMergePolicy extends CustomLogDocMergePolicy {
public EnableMergeLogDocMergePolicy(LogDocMergePolicyProvider provider) {
super(provider);
}
@Override
public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
// we don't enable merges while indexing documents, we do them in the background
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
return null;
}
return super.findMerges(trigger, infos);
}
@Override
public MergePolicy clone() {
// Lucene IW makes a clone internally but since we hold on to this instance
// the clone will just be the identity.
return this;
}
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.merge.policy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.TieredMergePolicy;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
@ -30,7 +29,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@ -47,7 +45,6 @@ public class TieredMergePolicyProvider extends AbstractMergePolicyProvider<Tiere
private volatile ByteSizeValue maxMergedSegment;
private volatile double segmentsPerTier;
private volatile double reclaimDeletesWeight;
private boolean asyncMerge;
private final ApplySettings applySettings = new ApplySettings();
@ -57,7 +54,6 @@ public class TieredMergePolicyProvider extends AbstractMergePolicyProvider<Tiere
public TieredMergePolicyProvider(Store store, IndexSettingsService indexSettingsService) {
super(store);
this.indexSettingsService = indexSettingsService;
this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true);
this.forceMergeDeletesPctAllowed = componentSettings.getAsDouble("expunge_deletes_allowed", 10d); // percentage
this.floorSegment = componentSettings.getAsBytesSize("floor_segment", new ByteSizeValue(2, ByteSizeUnit.MB));
this.maxMergeAtOnce = componentSettings.getAsInt("max_merge_at_once", 10);
@ -69,8 +65,8 @@ public class TieredMergePolicyProvider extends AbstractMergePolicyProvider<Tiere
fixSettingsIfNeeded();
logger.debug("using [tiered] merge policy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}], async_merge[{}]",
forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight, asyncMerge);
logger.debug("using [tiered] merge policy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]",
forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight);
indexSettingsService.addListener(applySettings);
}
@ -91,12 +87,7 @@ public class TieredMergePolicyProvider extends AbstractMergePolicyProvider<Tiere
@Override
public TieredMergePolicy newMergePolicy() {
CustomTieredMergePolicyProvider mergePolicy;
if (asyncMerge) {
mergePolicy = new EnableMergeTieredMergePolicyProvider(this);
} else {
mergePolicy = new CustomTieredMergePolicyProvider(this);
}
final CustomTieredMergePolicyProvider mergePolicy = new CustomTieredMergePolicyProvider(this);
mergePolicy.setNoCFSRatio(noCFSRatio);
mergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed);
mergePolicy.setFloorSegmentMB(floorSegment.mbFrac());
@ -222,20 +213,4 @@ public class TieredMergePolicyProvider extends AbstractMergePolicyProvider<Tiere
return this;
}
}
public static class EnableMergeTieredMergePolicyProvider extends CustomTieredMergePolicyProvider {
public EnableMergeTieredMergePolicyProvider(TieredMergePolicyProvider provider) {
super(provider);
}
@Override
public MergePolicy.MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
// we don't enable merges while indexing documents, we do them in the background
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
return null;
}
return super.findMerges(trigger, infos);
}
}
}

View File

@ -152,4 +152,5 @@ public class InternalEngineIntegrationTest extends ElasticsearchIntegrationTest
assertThat(total, Matchers.equalTo(t));
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine.internal;
import com.carrotsearch.randomizedtesting.annotations.Nightly;
import com.carrotsearch.randomizedtesting.annotations.Seed;
import com.google.common.base.Predicate;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(numNodes = 1, scope = ElasticsearchIntegrationTest.Scope.SUITE)
public class InternalEngineMergeTests extends ElasticsearchIntegrationTest {
@Test
@LuceneTestCase.Slow
public void testMergesHappening() throws InterruptedException, IOException, ExecutionException {
final int numOfShards = 5;
// some settings to keep num segments low
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(LogDocMergePolicyProvider.MIN_MERGE_DOCS_KEY, 10)
.put(LogDocMergePolicyProvider.MERGE_FACTORY_KEY, 5)
.put(LogByteSizeMergePolicy.DEFAULT_MIN_MERGE_MB, 0.5)
.build()));
long id = 0;
final int rounds = scaledRandomIntBetween(50, 300);
logger.info("Starting rounds [{}] ", rounds);
for (int i = 0; i < rounds; ++i) {
final int numDocs = scaledRandomIntBetween(100, 1000);
BulkRequestBuilder request = client().prepareBulk();
for (int j = 0; j < numDocs; ++j) {
request.add(Requests.indexRequest("test").type("type1").id(Long.toString(id++)).source(jsonBuilder().startObject().field("l", randomLong()).endObject()));
}
BulkResponse response = request.execute().actionGet();
refresh();
assertNoFailures(response);
IndicesStatsResponse stats = client().admin().indices().prepareStats("test").setSegments(true).setMerge(true).get();
logger.info("index round [{}] - segments {}, total merges {}, current merge {}", i, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent());
}
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get();
logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent());
long current = stats.getPrimaries().getMerge().getCurrent();
long count = stats.getPrimaries().getSegments().getCount();
return count < 50 && current == 0;
}
});
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get();
logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent());
long count = stats.getPrimaries().getSegments().getCount();
assertThat(count, Matchers.lessThanOrEqualTo(50l));
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.percolate.PercolateResponse;
@ -202,6 +203,12 @@ public class ElasticsearchAssertions {
assertVersionSerializable(searchResponse);
}
public static void assertNoFailures(BulkResponse response) {
assertThat("Unexpected ShardFailures: " + response.buildFailureMessage(),
response.hasFailures(), is(false));
assertVersionSerializable(response);
}
public static void assertFailures(SearchRequestBuilder searchRequestBuilder, RestStatus restStatus, Matcher<String> reasonMatcher) {
//when the number for shards is randomized and we expect failures
//we can either run into partial or total failures depending on the current number of shards
@ -513,4 +520,5 @@ public class ElasticsearchAssertions {
MockDirectoryHelper.wrappers.clear();
}
}
}