Bake in TieredMergePolicy

Today we provide the ability to plug in MergePolicy and
we provide the once lucene ships with. We do not recommend to change
the default and even only a small number of expert users would ever touch
this. This commit removes the ancient log byte size and log doc count
merge policy providers, simplifies the MergePolicy wiring and makes the
tiered MP the one and only default. All notions of a merge policy has been
removed from the docs and should be deprecated in the previous version.

Closes #11588
This commit is contained in:
Simon Willnauer 2015-06-10 17:48:39 +02:00
parent 134d898be9
commit f77804dad3
33 changed files with 456 additions and 1194 deletions

View File

@ -41,8 +41,6 @@ import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.query.IndexQueryParserService;
@ -298,7 +296,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
modules.add(new StoreModule(injector.getInstance(IndexStore.class).shardDirectory(), lock,
new StoreCloseListener(shardId, canDeleteShardContent, shardFilterCache), path));
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));
try {
shardInjector = modules.createChildInjector(injector);
} catch (CreationException e) {
@ -374,7 +371,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
closeInjectorResource(sId, shardInjector,
MergeSchedulerProvider.class,
MergePolicyProvider.class,
IndexShardGatewayService.class,
PercolatorQueriesRegistry.class);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.engine;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.similarities.Similarity;
@ -33,7 +34,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
@ -69,7 +69,7 @@ public final class EngineConfig {
private final IndicesWarmer warmer;
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final MergePolicyProvider mergePolicyProvider;
private final MergePolicy mergePolicy;
private final MergeSchedulerProvider mergeScheduler;
private final Analyzer analyzer;
private final Similarity similarity;
@ -142,7 +142,7 @@ public final class EngineConfig {
*/
public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService,
IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, Analyzer analyzer,
MergePolicy mergePolicy, MergeSchedulerProvider mergeScheduler, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache filterCache, QueryCachingPolicy filterCachingPolicy, TranslogConfig translogConfig) {
this.shardId = shardId;
@ -152,7 +152,7 @@ public final class EngineConfig {
this.warmer = warmer;
this.store = store;
this.deletionPolicy = deletionPolicy;
this.mergePolicyProvider = mergePolicyProvider;
this.mergePolicy = mergePolicy;
this.mergeScheduler = mergeScheduler;
this.analyzer = analyzer;
this.similarity = similarity;
@ -340,11 +340,10 @@ public final class EngineConfig {
}
/**
* Returns the {@link org.elasticsearch.index.merge.policy.MergePolicyProvider} used to obtain
* a {@link org.apache.lucene.index.MergePolicy} for the engines {@link org.apache.lucene.index.IndexWriter}
* Returns the {@link org.apache.lucene.index.MergePolicy} for the engines {@link org.apache.lucene.index.IndexWriter}
*/
public MergePolicyProvider getMergePolicyProvider() {
return mergePolicyProvider;
public MergePolicy getMergePolicy() {
return mergePolicy;
}
/**

View File

@ -45,8 +45,7 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
import org.elasticsearch.index.shard.ShardId;
@ -81,7 +80,6 @@ public class InternalEngine extends Engine {
@Nullable
private final IndicesWarmer warmer;
private final Translog translog;
private final MergePolicyProvider mergePolicyProvider;
private final MergeSchedulerProvider mergeScheduler;
private final IndexWriter indexWriter;
@ -116,7 +114,6 @@ public class InternalEngine extends Engine {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
this.indexingService = engineConfig.getIndexingService();
this.warmer = engineConfig.getWarmer();
this.mergePolicyProvider = engineConfig.getMergePolicyProvider();
this.mergeScheduler = engineConfig.getMergeScheduler();
this.dirtyLocks = new Object[engineConfig.getIndexConcurrency() * 50]; // we multiply it to have enough...
for (int i = 0; i < dirtyLocks.length; i++) {
@ -1020,7 +1017,7 @@ public class InternalEngine extends Engine {
}
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler.newMergeScheduler());
MergePolicy mergePolicy = mergePolicyProvider.getMergePolicy();
MergePolicy mergePolicy = config().getMergePolicy();
// Give us the opportunity to upgrade old segments while performing
// background merges
mergePolicy = new ElasticsearchMergePolicy(mergePolicy);

View File

@ -1,67 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge.policy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.TieredMergePolicy;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.store.Store;
public abstract class AbstractMergePolicyProvider<MP extends MergePolicy> extends AbstractIndexShardComponent implements MergePolicyProvider<MP> {
public static final String INDEX_COMPOUND_FORMAT = "index.compound_format";
protected volatile double noCFSRatio;
protected AbstractMergePolicyProvider(Store store) {
super(store.shardId(), store.indexSettings());
// Default to Lucene's default:
this.noCFSRatio = parseNoCFSRatio(indexSettings.get(INDEX_COMPOUND_FORMAT, Double.toString(TieredMergePolicy.DEFAULT_NO_CFS_RATIO)));
}
public static double parseNoCFSRatio(String noCFSRatio) {
noCFSRatio = noCFSRatio.trim();
if (noCFSRatio.equalsIgnoreCase("true")) {
return 1.0d;
} else if (noCFSRatio.equalsIgnoreCase("false")) {
return 0.0;
} else {
try {
double value = Double.parseDouble(noCFSRatio);
if (value < 0.0 || value > 1.0) {
throw new IllegalArgumentException("NoCFSRatio must be in the interval [0..1] but was: [" + value + "]");
}
return value;
} catch (NumberFormatException ex) {
throw new IllegalArgumentException("Expected a boolean or a value in the interval [0..1] but was: [" + noCFSRatio + "]", ex);
}
}
}
public static String formatNoCFSRatio(double ratio) {
if (ratio == 1.0) {
return Boolean.TRUE.toString();
} else if (ratio == 0.0) {
return Boolean.FALSE.toString();
} else {
return Double.toString(ratio);
}
}
}

View File

@ -1,129 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge.policy;
import com.google.common.base.Preconditions;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;
/**
*
*/
public class LogByteSizeMergePolicyProvider extends AbstractMergePolicyProvider<LogByteSizeMergePolicy> {
private final IndexSettingsService indexSettingsService;
private final ApplySettings applySettings = new ApplySettings();
private final LogByteSizeMergePolicy mergePolicy = new LogByteSizeMergePolicy();
public static final ByteSizeValue DEFAULT_MIN_MERGE_SIZE = new ByteSizeValue((long) (LogByteSizeMergePolicy.DEFAULT_MIN_MERGE_MB * 1024 * 1024), ByteSizeUnit.BYTES);
public static final ByteSizeValue DEFAULT_MAX_MERGE_SIZE = new ByteSizeValue((long) LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_MB, ByteSizeUnit.MB);
@Inject
public LogByteSizeMergePolicyProvider(Store store, IndexSettingsService indexSettingsService) {
super(store);
Preconditions.checkNotNull(store, "Store must be provided to merge policy");
this.indexSettingsService = indexSettingsService;
ByteSizeValue minMergeSize = indexSettings.getAsBytesSize("index.merge.policy.min_merge_size", DEFAULT_MIN_MERGE_SIZE);
ByteSizeValue maxMergeSize = indexSettings.getAsBytesSize("index.merge.policy.max_merge_size", DEFAULT_MAX_MERGE_SIZE);
int mergeFactor = indexSettings.getAsInt("index.merge.policy.merge_factor", LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR);
int maxMergeDocs = indexSettings.getAsInt("index.merge.policy.max_merge_docs", LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS);
boolean calibrateSizeByDeletes = indexSettings.getAsBoolean("index.merge.policy.calibrate_size_by_deletes", true);
mergePolicy.setMinMergeMB(minMergeSize.mbFrac());
mergePolicy.setMaxMergeMB(maxMergeSize.mbFrac());
mergePolicy.setMergeFactor(mergeFactor);
mergePolicy.setMaxMergeDocs(maxMergeDocs);
mergePolicy.setCalibrateSizeByDeletes(calibrateSizeByDeletes);
mergePolicy.setNoCFSRatio(noCFSRatio);
logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]",
mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes);
indexSettingsService.addListener(applySettings);
}
@Override
public LogByteSizeMergePolicy getMergePolicy() {
return mergePolicy;
}
@Override
public void close() {
indexSettingsService.removeListener(applySettings);
}
public static final String INDEX_MERGE_POLICY_MIN_MERGE_SIZE = "index.merge.policy.min_merge_size";
public static final String INDEX_MERGE_POLICY_MAX_MERGE_SIZE = "index.merge.policy.max_merge_size";
public static final String INDEX_MERGE_POLICY_MAX_MERGE_DOCS = "index.merge.policy.max_merge_docs";
public static final String INDEX_MERGE_POLICY_MERGE_FACTOR = "index.merge.policy.merge_factor";
public static final String INDEX_MERGE_POLICY_CALIBRATE_SIZE_BY_DELETES = "index.merge.policy.calibrate_size_by_deletes";
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
double oldMinMergeSizeMB = mergePolicy.getMinMergeMB();
ByteSizeValue minMergeSize = settings.getAsBytesSize(INDEX_MERGE_POLICY_MIN_MERGE_SIZE, null);
if (minMergeSize != null && minMergeSize.mbFrac() != oldMinMergeSizeMB) {
logger.info("updating min_merge_size from [{}mb] to [{}]", oldMinMergeSizeMB, minMergeSize);
mergePolicy.setMinMergeMB(minMergeSize.mbFrac());
}
double oldMaxMergeSizeMB = mergePolicy.getMaxMergeMB();
ByteSizeValue maxMergeSize = settings.getAsBytesSize(INDEX_MERGE_POLICY_MAX_MERGE_SIZE, null);
if (maxMergeSize != null && maxMergeSize.mbFrac() != oldMaxMergeSizeMB) {
logger.info("updating max_merge_size from [{}mb] to [{}]", oldMaxMergeSizeMB, maxMergeSize);
mergePolicy.setMaxMergeMB(maxMergeSize.mbFrac());
}
int oldMaxMergeDocs = mergePolicy.getMaxMergeDocs();
int maxMergeDocs = settings.getAsInt(INDEX_MERGE_POLICY_MAX_MERGE_DOCS, oldMaxMergeDocs);
if (maxMergeDocs != oldMaxMergeDocs) {
logger.info("updating max_merge_docs from [{}] to [{}]", oldMaxMergeDocs, maxMergeDocs);
mergePolicy.setMaxMergeDocs(maxMergeDocs);
}
int oldMergeFactor = mergePolicy.getMergeFactor();
int mergeFactor = settings.getAsInt(INDEX_MERGE_POLICY_MERGE_FACTOR, oldMergeFactor);
if (mergeFactor != oldMergeFactor) {
logger.info("updating merge_factor from [{}] to [{}]", oldMergeFactor, mergeFactor);
mergePolicy.setMergeFactor(mergeFactor);
}
boolean oldCalibrateSizeByDeletes = mergePolicy.getCalibrateSizeByDeletes();
boolean calibrateSizeByDeletes = settings.getAsBoolean(INDEX_MERGE_POLICY_CALIBRATE_SIZE_BY_DELETES, oldCalibrateSizeByDeletes);
if (calibrateSizeByDeletes != oldCalibrateSizeByDeletes) {
logger.info("updating calibrate_size_by_deletes from [{}] to [{}]", oldCalibrateSizeByDeletes, calibrateSizeByDeletes);
mergePolicy.setCalibrateSizeByDeletes(calibrateSizeByDeletes);
}
final double noCFSRatio = parseNoCFSRatio(settings.get(INDEX_COMPOUND_FORMAT, Double.toString(LogByteSizeMergePolicyProvider.this.noCFSRatio)));
if (noCFSRatio != LogByteSizeMergePolicyProvider.this.noCFSRatio) {
logger.info("updating index.compound_format from [{}] to [{}]", formatNoCFSRatio(LogByteSizeMergePolicyProvider.this.noCFSRatio), formatNoCFSRatio(noCFSRatio));
LogByteSizeMergePolicyProvider.this.noCFSRatio = noCFSRatio;
mergePolicy.setNoCFSRatio(noCFSRatio);
}
}
}
}

View File

@ -1,119 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge.policy;
import com.google.common.base.Preconditions;
import org.apache.lucene.index.LogDocMergePolicy;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;
/**
*
*/
public class LogDocMergePolicyProvider extends AbstractMergePolicyProvider<LogDocMergePolicy> {
private final IndexSettingsService indexSettingsService;
private final ApplySettings applySettings = new ApplySettings();
private final LogDocMergePolicy mergePolicy = new LogDocMergePolicy();
public static final String MAX_MERGE_DOCS_KEY = "index.merge.policy.max_merge_docs";
public static final String MIN_MERGE_DOCS_KEY = "index.merge.policy.min_merge_docs";
public static final String MERGE_FACTORY_KEY = "index.merge.policy.merge_factor";
@Inject
public LogDocMergePolicyProvider(Store store, IndexSettingsService indexSettingsService) {
super(store);
Preconditions.checkNotNull(store, "Store must be provided to merge policy");
this.indexSettingsService = indexSettingsService;
int minMergeDocs = indexSettings.getAsInt(MIN_MERGE_DOCS_KEY, LogDocMergePolicy.DEFAULT_MIN_MERGE_DOCS);
int maxMergeDocs = indexSettings.getAsInt(MAX_MERGE_DOCS_KEY, LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS);
int mergeFactor = indexSettings.getAsInt(MERGE_FACTORY_KEY, LogDocMergePolicy.DEFAULT_MERGE_FACTOR);
boolean calibrateSizeByDeletes = indexSettings.getAsBoolean("index.merge.policy.calibrate_size_by_deletes", true);
mergePolicy.setMinMergeDocs(minMergeDocs);
mergePolicy.setMaxMergeDocs(maxMergeDocs);
mergePolicy.setMergeFactor(mergeFactor);
mergePolicy.setCalibrateSizeByDeletes(calibrateSizeByDeletes);
mergePolicy.setNoCFSRatio(noCFSRatio);
logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]",
mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes);
indexSettingsService.addListener(applySettings);
}
@Override
public void close() {
indexSettingsService.removeListener(applySettings);
}
@Override
public LogDocMergePolicy getMergePolicy() {
return mergePolicy;
}
public static final String INDEX_MERGE_POLICY_MIN_MERGE_DOCS = "index.merge.policy.min_merge_docs";
public static final String INDEX_MERGE_POLICY_MAX_MERGE_DOCS = "index.merge.policy.max_merge_docs";
public static final String INDEX_MERGE_POLICY_MERGE_FACTOR = "index.merge.policy.merge_factor";
public static final String INDEX_MERGE_POLICY_CALIBRATE_SIZE_BY_DELETES = "index.merge.policy.calibrate_size_by_deletes";
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
int oldMinMergeDocs = mergePolicy.getMinMergeDocs();
int minMergeDocs = settings.getAsInt(INDEX_MERGE_POLICY_MIN_MERGE_DOCS, oldMinMergeDocs);
if (minMergeDocs != oldMinMergeDocs) {
logger.info("updating min_merge_docs from [{}] to [{}]", oldMinMergeDocs, minMergeDocs);
mergePolicy.setMinMergeDocs(minMergeDocs);
}
int oldMaxMergeDocs = mergePolicy.getMaxMergeDocs();
int maxMergeDocs = settings.getAsInt(INDEX_MERGE_POLICY_MAX_MERGE_DOCS, oldMaxMergeDocs);
if (maxMergeDocs != oldMaxMergeDocs) {
logger.info("updating max_merge_docs from [{}] to [{}]", oldMaxMergeDocs, maxMergeDocs);
mergePolicy.setMaxMergeDocs(maxMergeDocs);
}
int oldMergeFactor = mergePolicy.getMergeFactor();
int mergeFactor = settings.getAsInt(INDEX_MERGE_POLICY_MERGE_FACTOR, oldMergeFactor);
if (mergeFactor != oldMergeFactor) {
logger.info("updating merge_factor from [{}] to [{}]", oldMergeFactor, mergeFactor);
mergePolicy.setMergeFactor(mergeFactor);
}
boolean oldCalibrateSizeByDeletes = mergePolicy.getCalibrateSizeByDeletes();
boolean calibrateSizeByDeletes = settings.getAsBoolean(INDEX_MERGE_POLICY_CALIBRATE_SIZE_BY_DELETES, oldCalibrateSizeByDeletes);
if (calibrateSizeByDeletes != oldCalibrateSizeByDeletes) {
logger.info("updating calibrate_size_by_deletes from [{}] to [{}]", oldCalibrateSizeByDeletes, calibrateSizeByDeletes);
mergePolicy.setCalibrateSizeByDeletes(calibrateSizeByDeletes);
}
double noCFSRatio = parseNoCFSRatio(settings.get(INDEX_COMPOUND_FORMAT, Double.toString(LogDocMergePolicyProvider.this.noCFSRatio)));
if (noCFSRatio != LogDocMergePolicyProvider.this.noCFSRatio) {
logger.info("updating index.compound_format from [{}] to [{}]", formatNoCFSRatio(LogDocMergePolicyProvider.this.noCFSRatio), formatNoCFSRatio(noCFSRatio));
LogDocMergePolicyProvider.this.noCFSRatio = noCFSRatio;
mergePolicy.setNoCFSRatio(noCFSRatio);
}
}
}
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge.policy;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
/**
*
*/
public class MergePolicyModule extends AbstractModule {
private final Settings settings;
public static final String MERGE_POLICY_TYPE_KEY = "index.merge.policy.type";
public MergePolicyModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
bind(MergePolicyProvider.class)
.to(settings.getAsClass("index.merge.policy.type", TieredMergePolicyProvider.class, "org.elasticsearch.index.merge.policy.", "MergePolicyProvider"))
.asEagerSingleton();
}
}

View File

@ -1,33 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge.policy;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.index.shard.IndexShardComponent;
import java.io.Closeable;
/**
*
*/
public interface MergePolicyProvider<T extends MergePolicy> extends IndexShardComponent, Closeable {
T getMergePolicy();
}

View File

@ -1,166 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge.policy;
import org.apache.lucene.index.TieredMergePolicy;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;
public class TieredMergePolicyProvider extends AbstractMergePolicyProvider<TieredMergePolicy> {
private final IndexSettingsService indexSettingsService;
private final ApplySettings applySettings = new ApplySettings();
private final TieredMergePolicy mergePolicy = new TieredMergePolicy();
public static final double DEFAULT_EXPUNGE_DELETES_ALLOWED = 10d;
public static final ByteSizeValue DEFAULT_FLOOR_SEGMENT = new ByteSizeValue(2, ByteSizeUnit.MB);
public static final int DEFAULT_MAX_MERGE_AT_ONCE = 10;
public static final int DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT = 30;
public static final ByteSizeValue DEFAULT_MAX_MERGED_SEGMENT = new ByteSizeValue(5, ByteSizeUnit.GB);
public static final double DEFAULT_SEGMENTS_PER_TIER = 10.0d;
public static final double DEFAULT_RECLAIM_DELETES_WEIGHT = 2.0d;
@Inject
public TieredMergePolicyProvider(Store store, IndexSettingsService indexSettingsService) {
super(store);
this.indexSettingsService = indexSettingsService;
double forceMergeDeletesPctAllowed = indexSettings.getAsDouble("index.merge.policy.expunge_deletes_allowed", DEFAULT_EXPUNGE_DELETES_ALLOWED); // percentage
ByteSizeValue floorSegment = indexSettings.getAsBytesSize("index.merge.policy.floor_segment", DEFAULT_FLOOR_SEGMENT);
int maxMergeAtOnce = indexSettings.getAsInt("index.merge.policy.max_merge_at_once", DEFAULT_MAX_MERGE_AT_ONCE);
int maxMergeAtOnceExplicit = indexSettings.getAsInt("index.merge.policy.max_merge_at_once_explicit", DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT);
// TODO is this really a good default number for max_merge_segment, what happens for large indices, won't they end up with many segments?
ByteSizeValue maxMergedSegment = indexSettings.getAsBytesSize("index.merge.policy.max_merged_segment", DEFAULT_MAX_MERGED_SEGMENT);
double segmentsPerTier = indexSettings.getAsDouble("index.merge.policy.segments_per_tier", DEFAULT_SEGMENTS_PER_TIER);
double reclaimDeletesWeight = indexSettings.getAsDouble("index.merge.policy.reclaim_deletes_weight", DEFAULT_RECLAIM_DELETES_WEIGHT);
maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier);
mergePolicy.setNoCFSRatio(noCFSRatio);
mergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed);
mergePolicy.setFloorSegmentMB(floorSegment.mbFrac());
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
mergePolicy.setMaxMergeAtOnceExplicit(maxMergeAtOnceExplicit);
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.mbFrac());
mergePolicy.setSegmentsPerTier(segmentsPerTier);
mergePolicy.setReclaimDeletesWeight(reclaimDeletesWeight);
logger.debug("using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]",
forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight);
indexSettingsService.addListener(applySettings);
}
private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerTier) {
// fixing maxMergeAtOnce, see TieredMergePolicy#setMaxMergeAtOnce
if (!(segmentsPerTier >= maxMergeAtOnce)) {
int newMaxMergeAtOnce = (int) segmentsPerTier;
// max merge at once should be at least 2
if (newMaxMergeAtOnce <= 1) {
newMaxMergeAtOnce = 2;
}
logger.debug("[tiered] merge mergePolicy changing max_merge_at_once from [{}] to [{}] because segments_per_tier [{}] has to be higher or equal to it", maxMergeAtOnce, newMaxMergeAtOnce, segmentsPerTier);
maxMergeAtOnce = newMaxMergeAtOnce;
}
return maxMergeAtOnce;
}
@Override
public TieredMergePolicy getMergePolicy() {
return mergePolicy;
}
@Override
public void close() {
indexSettingsService.removeListener(applySettings);
}
public static final String INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED = "index.merge.policy.expunge_deletes_allowed";
public static final String INDEX_MERGE_POLICY_FLOOR_SEGMENT = "index.merge.policy.floor_segment";
public static final String INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE = "index.merge.policy.max_merge_at_once";
public static final String INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT = "index.merge.policy.max_merge_at_once_explicit";
public static final String INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT = "index.merge.policy.max_merged_segment";
public static final String INDEX_MERGE_POLICY_SEGMENTS_PER_TIER = "index.merge.policy.segments_per_tier";
public static final String INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT = "index.merge.policy.reclaim_deletes_weight";
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
final double oldExpungeDeletesPctAllowed = mergePolicy.getForceMergeDeletesPctAllowed();
final double expungeDeletesPctAllowed = settings.getAsDouble(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED, oldExpungeDeletesPctAllowed);
if (expungeDeletesPctAllowed != oldExpungeDeletesPctAllowed) {
logger.info("updating [expunge_deletes_allowed] from [{}] to [{}]", oldExpungeDeletesPctAllowed, expungeDeletesPctAllowed);
mergePolicy.setForceMergeDeletesPctAllowed(expungeDeletesPctAllowed);
}
final double oldFloorSegmentMB = mergePolicy.getFloorSegmentMB();
final ByteSizeValue floorSegment = settings.getAsBytesSize(INDEX_MERGE_POLICY_FLOOR_SEGMENT, null);
if (floorSegment != null && floorSegment.mbFrac() != oldFloorSegmentMB) {
logger.info("updating [floor_segment] from [{}mb] to [{}]", oldFloorSegmentMB, floorSegment);
mergePolicy.setFloorSegmentMB(floorSegment.mbFrac());
}
final double oldSegmentsPerTier = mergePolicy.getSegmentsPerTier();
final double segmentsPerTier = settings.getAsDouble(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, oldSegmentsPerTier);
if (segmentsPerTier != oldSegmentsPerTier) {
logger.info("updating [segments_per_tier] from [{}] to [{}]", oldSegmentsPerTier, segmentsPerTier);
mergePolicy.setSegmentsPerTier(segmentsPerTier);
}
final int oldMaxMergeAtOnce = mergePolicy.getMaxMergeAtOnce();
int maxMergeAtOnce = settings.getAsInt(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, oldMaxMergeAtOnce);
if (maxMergeAtOnce != oldMaxMergeAtOnce) {
logger.info("updating [max_merge_at_once] from [{}] to [{}]", oldMaxMergeAtOnce, maxMergeAtOnce);
maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier);
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
}
final int oldMaxMergeAtOnceExplicit = mergePolicy.getMaxMergeAtOnceExplicit();
final int maxMergeAtOnceExplicit = settings.getAsInt(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT, oldMaxMergeAtOnceExplicit);
if (maxMergeAtOnceExplicit != oldMaxMergeAtOnceExplicit) {
logger.info("updating [max_merge_at_once_explicit] from [{}] to [{}]", oldMaxMergeAtOnceExplicit, maxMergeAtOnceExplicit);
mergePolicy.setMaxMergeAtOnceExplicit(maxMergeAtOnceExplicit);
}
final double oldMaxMergedSegmentMB = mergePolicy.getMaxMergedSegmentMB();
final ByteSizeValue maxMergedSegment = settings.getAsBytesSize(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT, null);
if (maxMergedSegment != null && maxMergedSegment.mbFrac() != oldMaxMergedSegmentMB) {
logger.info("updating [max_merged_segment] from [{}mb] to [{}]", oldMaxMergedSegmentMB, maxMergedSegment);
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.mbFrac());
}
final double oldReclaimDeletesWeight = mergePolicy.getReclaimDeletesWeight();
final double reclaimDeletesWeight = settings.getAsDouble(INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, oldReclaimDeletesWeight);
if (reclaimDeletesWeight != oldReclaimDeletesWeight) {
logger.info("updating [reclaim_deletes_weight] from [{}] to [{}]", oldReclaimDeletesWeight, reclaimDeletesWeight);
mergePolicy.setReclaimDeletesWeight(reclaimDeletesWeight);
}
double noCFSRatio = parseNoCFSRatio(settings.get(INDEX_COMPOUND_FORMAT, Double.toString(TieredMergePolicyProvider.this.noCFSRatio)));
if (noCFSRatio != TieredMergePolicyProvider.this.noCFSRatio) {
logger.info("updating index.compound_format from [{}] to [{}]", formatNoCFSRatio(TieredMergePolicyProvider.this.noCFSRatio), formatNoCFSRatio(noCFSRatio));
mergePolicy.setNoCFSRatio(noCFSRatio);
TieredMergePolicyProvider.this.noCFSRatio = noCFSRatio;
}
}
}
}

View File

@ -30,16 +30,13 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider;
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.ttl.IndicesTTLService;
@ -76,15 +73,6 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE);
indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_REFRESH_INTERVAL, Validator.TIME);
indexDynamicSettings.addDynamicSetting(GatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS);
indexDynamicSettings.addDynamicSetting(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MIN_MERGE_SIZE, Validator.BYTES_SIZE);
indexDynamicSettings.addDynamicSetting(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_SIZE, Validator.BYTES_SIZE);
indexDynamicSettings.addDynamicSetting(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, Validator.POSITIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, Validator.INTEGER_GTE_2);
indexDynamicSettings.addDynamicSetting(LogByteSizeMergePolicyProvider.INDEX_COMPOUND_FORMAT);
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MIN_MERGE_DOCS, Validator.POSITIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, Validator.POSITIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, Validator.INTEGER_GTE_2);
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_COMPOUND_FORMAT);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
@ -106,14 +94,14 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_REFORMAT);
indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_LEVEL);
indexDynamicSettings.addDynamicSetting(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE, Validator.INTEGER);
indexDynamicSettings.addDynamicSetting(TieredMergePolicyProvider.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED, Validator.DOUBLE);
indexDynamicSettings.addDynamicSetting(TieredMergePolicyProvider.INDEX_MERGE_POLICY_FLOOR_SEGMENT, Validator.BYTES_SIZE);
indexDynamicSettings.addDynamicSetting(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, Validator.INTEGER_GTE_2);
indexDynamicSettings.addDynamicSetting(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT, Validator.INTEGER_GTE_2);
indexDynamicSettings.addDynamicSetting(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT, Validator.BYTES_SIZE);
indexDynamicSettings.addDynamicSetting(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, Validator.DOUBLE_GTE_2);
indexDynamicSettings.addDynamicSetting(TieredMergePolicyProvider.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
indexDynamicSettings.addDynamicSetting(TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT);
indexDynamicSettings.addDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED, Validator.DOUBLE);
indexDynamicSettings.addDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT, Validator.BYTES_SIZE);
indexDynamicSettings.addDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, Validator.INTEGER_GTE_2);
indexDynamicSettings.addDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT, Validator.INTEGER_GTE_2);
indexDynamicSettings.addDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT, Validator.BYTES_SIZE);
indexDynamicSettings.addDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, Validator.DOUBLE_GTE_2);
indexDynamicSettings.addDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
indexDynamicSettings.addDynamicSetting(MergePolicyConfig.INDEX_COMPOUND_FORMAT);
indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_INTERVAL, Validator.TIME);
indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, Validator.INTEGER);
indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.merge.policy;
package org.elasticsearch.index.shard;
import com.google.common.collect.Lists;
import org.apache.lucene.index.*;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.merge.policy;
package org.elasticsearch.index.shard;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;

View File

@ -23,6 +23,8 @@ import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
@ -75,7 +77,6 @@ import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
@ -155,9 +156,9 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndicesWarmer warmer;
private final SnapshotDeletionPolicy deletionPolicy;
private final SimilarityService similarityService;
private final MergePolicyProvider mergePolicyProvider;
private final EngineConfig engineConfig;
private final TranslogConfig translogConfig;
private final MergePolicyConfig mergePolicyConfig;
private TimeValue refreshInterval;
@ -198,14 +199,13 @@ public class IndexShard extends AbstractIndexShardComponent {
ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService,
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService,
ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache,
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory,
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory,
ClusterService clusterService, NodeEnvironment nodeEnv, ShardPath path, BigArrays bigArrays) {
super(shardId, indexSettingsService.getSettings());
this.codecService = codecService;
this.warmer = warmer;
this.deletionPolicy = deletionPolicy;
this.similarityService = similarityService;
this.mergePolicyProvider = mergePolicyProvider;
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
this.engineFactory = factory;
@ -241,6 +241,7 @@ public class IndexShard extends AbstractIndexShardComponent {
indexSettingsService.addListener(applyRefreshSettings);
this.mapperAnalyzer = new MapperAnalyzer(mapperService);
this.path = path;
this.mergePolicyConfig = new MergePolicyConfig(logger, indexSettings);
/* create engine config */
logger.debug("state: [CREATED]");
@ -251,6 +252,7 @@ public class IndexShard extends AbstractIndexShardComponent {
this.engineConfig = newEngineConfig(translogConfig);
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
}
public Store store() {
@ -1096,6 +1098,7 @@ public class IndexShard extends AbstractIndexShardComponent {
config.setVersionMapSizeSetting(versionMapSize);
}
}
mergePolicyConfig.onRefreshSettings(settings);
if (change) {
refresh("apply settings");
}
@ -1335,7 +1338,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
};
return new EngineConfig(shardId,
threadPool, indexingService, indexSettingsService, warmer, store, deletionPolicy, mergePolicyProvider, mergeScheduler,
threadPool, indexingService, indexSettingsService, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeScheduler,
mapperAnalyzer, similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.filter(), indexCache.filterPolicy(), translogConfig);
}

View File

@ -0,0 +1,190 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.TieredMergePolicy;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettingsService;
public final class MergePolicyConfig implements IndexSettingsService.Listener{
private final TieredMergePolicy mergePolicy = new TieredMergePolicy();
private final ESLogger logger;
private final boolean mergesEnabled;
private volatile double noCFSRatio;
public static final double DEFAULT_EXPUNGE_DELETES_ALLOWED = 10d;
public static final ByteSizeValue DEFAULT_FLOOR_SEGMENT = new ByteSizeValue(2, ByteSizeUnit.MB);
public static final int DEFAULT_MAX_MERGE_AT_ONCE = 10;
public static final int DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT = 30;
public static final ByteSizeValue DEFAULT_MAX_MERGED_SEGMENT = new ByteSizeValue(5, ByteSizeUnit.GB);
public static final double DEFAULT_SEGMENTS_PER_TIER = 10.0d;
public static final double DEFAULT_RECLAIM_DELETES_WEIGHT = 2.0d;
public static final String INDEX_COMPOUND_FORMAT = "index.compound_format";
public static final String INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED = "index.merge.policy.expunge_deletes_allowed";
public static final String INDEX_MERGE_POLICY_FLOOR_SEGMENT = "index.merge.policy.floor_segment";
public static final String INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE = "index.merge.policy.max_merge_at_once";
public static final String INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT = "index.merge.policy.max_merge_at_once_explicit";
public static final String INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT = "index.merge.policy.max_merged_segment";
public static final String INDEX_MERGE_POLICY_SEGMENTS_PER_TIER = "index.merge.policy.segments_per_tier";
public static final String INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT = "index.merge.policy.reclaim_deletes_weight";
public static final String INDEX_MERGE_ENABLED = "index.merge.enabled";
public MergePolicyConfig(ESLogger logger, Settings indexSettings) {
this.logger = logger;
this.noCFSRatio = parseNoCFSRatio(indexSettings.get(INDEX_COMPOUND_FORMAT, Double.toString(TieredMergePolicy.DEFAULT_NO_CFS_RATIO)));
double forceMergeDeletesPctAllowed = indexSettings.getAsDouble("index.merge.policy.expunge_deletes_allowed", DEFAULT_EXPUNGE_DELETES_ALLOWED); // percentage
ByteSizeValue floorSegment = indexSettings.getAsBytesSize("index.merge.policy.floor_segment", DEFAULT_FLOOR_SEGMENT);
int maxMergeAtOnce = indexSettings.getAsInt("index.merge.policy.max_merge_at_once", DEFAULT_MAX_MERGE_AT_ONCE);
int maxMergeAtOnceExplicit = indexSettings.getAsInt("index.merge.policy.max_merge_at_once_explicit", DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT);
// TODO is this really a good default number for max_merge_segment, what happens for large indices, won't they end up with many segments?
ByteSizeValue maxMergedSegment = indexSettings.getAsBytesSize("index.merge.policy.max_merged_segment", DEFAULT_MAX_MERGED_SEGMENT);
double segmentsPerTier = indexSettings.getAsDouble("index.merge.policy.segments_per_tier", DEFAULT_SEGMENTS_PER_TIER);
double reclaimDeletesWeight = indexSettings.getAsDouble("index.merge.policy.reclaim_deletes_weight", DEFAULT_RECLAIM_DELETES_WEIGHT);
this.mergesEnabled = indexSettings.getAsBoolean(INDEX_MERGE_ENABLED, true);
if (mergesEnabled == false) {
logger.warn("[{}] is set to false, this should only be used in tests and can cause serious problems in production environments", INDEX_MERGE_ENABLED);
}
maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier);
mergePolicy.setNoCFSRatio(noCFSRatio);
mergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed);
mergePolicy.setFloorSegmentMB(floorSegment.mbFrac());
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
mergePolicy.setMaxMergeAtOnceExplicit(maxMergeAtOnceExplicit);
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.mbFrac());
mergePolicy.setSegmentsPerTier(segmentsPerTier);
mergePolicy.setReclaimDeletesWeight(reclaimDeletesWeight);
logger.debug("using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]",
forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight);
}
private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerTier) {
// fixing maxMergeAtOnce, see TieredMergePolicy#setMaxMergeAtOnce
if (!(segmentsPerTier >= maxMergeAtOnce)) {
int newMaxMergeAtOnce = (int) segmentsPerTier;
// max merge at once should be at least 2
if (newMaxMergeAtOnce <= 1) {
newMaxMergeAtOnce = 2;
}
logger.debug("changing max_merge_at_once from [{}] to [{}] because segments_per_tier [{}] has to be higher or equal to it", maxMergeAtOnce, newMaxMergeAtOnce, segmentsPerTier);
maxMergeAtOnce = newMaxMergeAtOnce;
}
return maxMergeAtOnce;
}
public MergePolicy getMergePolicy() {
return mergesEnabled ? mergePolicy : NoMergePolicy.INSTANCE;
}
@Override
public void onRefreshSettings(Settings settings) {
final double oldExpungeDeletesPctAllowed = mergePolicy.getForceMergeDeletesPctAllowed();
final double expungeDeletesPctAllowed = settings.getAsDouble(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED, oldExpungeDeletesPctAllowed);
if (expungeDeletesPctAllowed != oldExpungeDeletesPctAllowed) {
logger.info("updating [expunge_deletes_allowed] from [{}] to [{}]", oldExpungeDeletesPctAllowed, expungeDeletesPctAllowed);
mergePolicy.setForceMergeDeletesPctAllowed(expungeDeletesPctAllowed);
}
final double oldFloorSegmentMB = mergePolicy.getFloorSegmentMB();
final ByteSizeValue floorSegment = settings.getAsBytesSize(INDEX_MERGE_POLICY_FLOOR_SEGMENT, null);
if (floorSegment != null && floorSegment.mbFrac() != oldFloorSegmentMB) {
logger.info("updating [floor_segment] from [{}mb] to [{}]", oldFloorSegmentMB, floorSegment);
mergePolicy.setFloorSegmentMB(floorSegment.mbFrac());
}
final double oldSegmentsPerTier = mergePolicy.getSegmentsPerTier();
final double segmentsPerTier = settings.getAsDouble(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, oldSegmentsPerTier);
if (segmentsPerTier != oldSegmentsPerTier) {
logger.info("updating [segments_per_tier] from [{}] to [{}]", oldSegmentsPerTier, segmentsPerTier);
mergePolicy.setSegmentsPerTier(segmentsPerTier);
}
final int oldMaxMergeAtOnce = mergePolicy.getMaxMergeAtOnce();
int maxMergeAtOnce = settings.getAsInt(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, oldMaxMergeAtOnce);
if (maxMergeAtOnce != oldMaxMergeAtOnce) {
logger.info("updating [max_merge_at_once] from [{}] to [{}]", oldMaxMergeAtOnce, maxMergeAtOnce);
maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier);
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
}
final int oldMaxMergeAtOnceExplicit = mergePolicy.getMaxMergeAtOnceExplicit();
final int maxMergeAtOnceExplicit = settings.getAsInt(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT, oldMaxMergeAtOnceExplicit);
if (maxMergeAtOnceExplicit != oldMaxMergeAtOnceExplicit) {
logger.info("updating [max_merge_at_once_explicit] from [{}] to [{}]", oldMaxMergeAtOnceExplicit, maxMergeAtOnceExplicit);
mergePolicy.setMaxMergeAtOnceExplicit(maxMergeAtOnceExplicit);
}
final double oldMaxMergedSegmentMB = mergePolicy.getMaxMergedSegmentMB();
final ByteSizeValue maxMergedSegment = settings.getAsBytesSize(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT, null);
if (maxMergedSegment != null && maxMergedSegment.mbFrac() != oldMaxMergedSegmentMB) {
logger.info("updating [max_merged_segment] from [{}mb] to [{}]", oldMaxMergedSegmentMB, maxMergedSegment);
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.mbFrac());
}
final double oldReclaimDeletesWeight = mergePolicy.getReclaimDeletesWeight();
final double reclaimDeletesWeight = settings.getAsDouble(INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, oldReclaimDeletesWeight);
if (reclaimDeletesWeight != oldReclaimDeletesWeight) {
logger.info("updating [reclaim_deletes_weight] from [{}] to [{}]", oldReclaimDeletesWeight, reclaimDeletesWeight);
mergePolicy.setReclaimDeletesWeight(reclaimDeletesWeight);
}
double noCFSRatio = parseNoCFSRatio(settings.get(INDEX_COMPOUND_FORMAT, Double.toString(MergePolicyConfig.this.noCFSRatio)));
if (noCFSRatio != MergePolicyConfig.this.noCFSRatio) {
logger.info("updating index.compound_format from [{}] to [{}]", formatNoCFSRatio(MergePolicyConfig.this.noCFSRatio), formatNoCFSRatio(noCFSRatio));
mergePolicy.setNoCFSRatio(noCFSRatio);
MergePolicyConfig.this.noCFSRatio = noCFSRatio;
}
}
public static double parseNoCFSRatio(String noCFSRatio) {
noCFSRatio = noCFSRatio.trim();
if (noCFSRatio.equalsIgnoreCase("true")) {
return 1.0d;
} else if (noCFSRatio.equalsIgnoreCase("false")) {
return 0.0;
} else {
try {
double value = Double.parseDouble(noCFSRatio);
if (value < 0.0 || value > 1.0) {
throw new IllegalArgumentException("NoCFSRatio must be in the interval [0..1] but was: [" + value + "]");
}
return value;
} catch (NumberFormatException ex) {
throw new IllegalArgumentException("Expected a boolean or a value in the interval [0..1] but was: [" + noCFSRatio + "]", ex);
}
}
}
public static String formatNoCFSRatio(double ratio) {
if (ratio == 1.0) {
return Boolean.TRUE.toString();
} else if (ratio == 0.0) {
return Boolean.FALSE.toString();
} else {
return Double.toString(ratio);
}
}
}

View File

@ -40,7 +40,6 @@ import org.elasticsearch.index.fielddata.ShardFieldData;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
@ -51,7 +50,6 @@ import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.suggest.stats.ShardSuggestService;
import org.elasticsearch.index.termvectors.ShardTermVectorsService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesWarmer;
@ -81,7 +79,7 @@ public final class ShadowIndexShard extends IndexShard {
IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache,
ShardBitsetFilterCache shardBitsetFilterCache, @Nullable IndicesWarmer warmer,
SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService,
MergePolicyProvider mergePolicyProvider, EngineFactory factory, ClusterService clusterService,
EngineFactory factory, ClusterService clusterService,
NodeEnvironment nodeEnv, ShardPath path, BigArrays bigArrays) throws IOException {
super(shardId, indexSettingsService, indicesLifecycle, store, mergeScheduler,
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
@ -89,7 +87,7 @@ public final class ShadowIndexShard extends IndexShard {
shardFieldData, percolatorQueriesRegistry, shardPercolateService, codecService,
termVectorsService, indexFieldDataService, indexService, shardSuggestService,
shardQueryCache, shardBitsetFilterCache, warmer, deletionPolicy, similarityService,
mergePolicyProvider, factory, clusterService, nodeEnv, path, bigArrays);
factory, clusterService, nodeEnv, path, bigArrays);
}
/**

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.merge.policy;
package org.elasticsearch.index.shard;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.CodecReader;

View File

@ -40,10 +40,9 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexException;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.action.admin.indices.upgrade.UpgradeTest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
@ -53,7 +52,6 @@ import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.index.merge.NoMergePolicyProvider;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Before;
@ -110,7 +108,7 @@ public class OldIndexBackwardsCompatibilityTests extends ElasticsearchIntegratio
@Override
public Settings nodeSettings(int ord) {
return Settings.builder()
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class) // disable merging so no segments will be upgraded
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) // disable merging so no segments will be upgraded
.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, 30) // increase recovery speed for small files
.build();
}

View File

@ -34,7 +34,7 @@ import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.MatcherAssert;
import org.junit.Test;

View File

@ -26,7 +26,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;
@ -53,9 +53,6 @@ public class InternalEngineMergeTests extends ElasticsearchIntegrationTest {
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(LogDocMergePolicyProvider.MIN_MERGE_DOCS_KEY, 10)
.put(LogDocMergePolicyProvider.MERGE_FACTORY_KEY, 5)
.put(LogByteSizeMergePolicy.DEFAULT_MIN_MERGE_MB, 0.5)
.build()));
long id = 0;
final int rounds = scaledRandomIntBetween(50, 300);

View File

@ -64,8 +64,6 @@ import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.object.RootObjectMapper;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexDynamicSettingsModule;
@ -99,7 +97,6 @@ import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.hamcrest.Matchers.*;
public class InternalEngineTests extends ElasticsearchTestCase {
protected final ShardId shardId = new ShardId(new Index("index"), 1);
@ -234,29 +231,25 @@ public class InternalEngineTests extends ElasticsearchTestCase {
return new SnapshotDeletionPolicy(createIndexDeletionPolicy());
}
protected MergePolicyProvider<?> createMergePolicy() {
return new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(new Index("test"), EMPTY_SETTINGS));
}
protected MergeSchedulerProvider createMergeScheduler(IndexSettingsService indexSettingsService) {
return new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService);
}
protected InternalEngine createEngine(Store store, Path translogPath) {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
return createEngine(indexSettingsService, store, translogPath, createMergeScheduler(indexSettingsService));
return createEngine(indexSettingsService, store, translogPath, createMergeScheduler(indexSettingsService), newMergePolicy());
}
protected InternalEngine createEngine(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerProvider mergeSchedulerProvider) {
return new InternalEngine(config(indexSettingsService, store, translogPath, mergeSchedulerProvider), false);
protected InternalEngine createEngine(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerProvider mergeSchedulerProvider, MergePolicy mergePolicy) {
return new InternalEngine(config(indexSettingsService, store, translogPath, mergeSchedulerProvider, mergePolicy), false);
}
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerProvider mergeSchedulerProvider) {
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerProvider mergeSchedulerProvider, MergePolicy mergePolicy) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettingsService.getSettings(), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
, null, store, createSnapshotDeletionPolicy(), createMergePolicy(), mergeSchedulerProvider,
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerProvider,
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.FailedEngineListener() {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
@ -423,8 +416,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
try (Store store = createStore();
Engine engine = createEngine(indexSettingsService, store, createTempDir(), mergeSchedulerProvider)) {
Engine engine = createEngine(indexSettingsService, store, createTempDir(), mergeSchedulerProvider, new TieredMergePolicy())) {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
@ -903,7 +895,11 @@ public class InternalEngineTests extends ElasticsearchTestCase {
}
}
public void testForceMerge() {
public void testForceMerge() throws IOException {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
try (Store store = createStore();
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), createMergeScheduler(indexSettingsService),
new LogByteSizeMergePolicy()), false)) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null);
@ -925,7 +921,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertEquals(engine.segments(true).size(), 1);
try (Engine.Searcher test = engine.acquireSearcher("test")) {
assertEquals(numDocs - 1, test.reader().numDocs());
assertEquals(numDocs - 1, test.reader().maxDoc());
assertEquals(engine.config().getMergePolicy().toString(), numDocs - 1, test.reader().maxDoc());
}
doc = testParsedDocument(Integer.toString(1), Integer.toString(1), "test", null, -1, -1, testDocument(), B_1, null);
@ -939,6 +935,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertEquals(numDocs - 1, test.reader().maxDoc());
}
}
}
public void testForceMergeAndClose() throws IOException, InterruptedException {
int numIters = randomIntBetween(2, 10);
@ -1352,7 +1349,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
public void testEnableGcDeletes() throws Exception {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
try (Store store = createStore();
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), createMergeScheduler(indexSettingsService)), false)) {
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), createMergeScheduler(indexSettingsService), newMergePolicy()), false)) {
engine.config().setEnableGcDeletes(false);
// Add document
@ -1574,7 +1571,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
.put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb").build();
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), indexSettings);
try (Store store = createStore();
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), createMergeScheduler(indexSettingsService)),
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), createMergeScheduler(indexSettingsService), newMergePolicy()),
false)) {
for (int i = 0; i < 100; i++) {
String id = Integer.toString(i);
@ -1625,7 +1622,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
// now it should be OK.
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings)
.put(EngineConfig.INDEX_FORCE_NEW_TRANSLOG, true).build());
engine = createEngine(indexSettingsService, store, primaryTranslogDir, createMergeScheduler(indexSettingsService));
engine = createEngine(indexSettingsService, store, primaryTranslogDir, createMergeScheduler(indexSettingsService), newMergePolicy());
}
public void testTranslogReplayWithFailure() throws IOException {
@ -1878,7 +1875,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettingsService()
, null, store, createSnapshotDeletionPolicy(), createMergePolicy(), config.getMergeScheduler(),
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeScheduler(),
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener()
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);

View File

@ -23,10 +23,7 @@ import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
@ -51,8 +48,6 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
@ -202,10 +197,6 @@ public class ShadowEngineTests extends ElasticsearchTestCase {
return new SnapshotDeletionPolicy(createIndexDeletionPolicy());
}
protected MergePolicyProvider<?> createMergePolicy() {
return new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(new Index("test"), EMPTY_SETTINGS));
}
protected MergeSchedulerProvider createMergeScheduler(IndexSettingsService indexSettingsService) {
return new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService);
}
@ -232,7 +223,7 @@ public class ShadowEngineTests extends ElasticsearchTestCase {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettingsService.getSettings(), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
, null, store, createSnapshotDeletionPolicy(), createMergePolicy(), mergeSchedulerProvider,
, null, store, createSnapshotDeletionPolicy(),newMergePolicy(), mergeSchedulerProvider,
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {

View File

@ -1,326 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge.policy;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.IOException;
import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
import static org.hamcrest.Matchers.equalTo;
public class MergePolicySettingsTest extends ElasticsearchTestCase {
protected final ShardId shardId = new ShardId(new Index("index"), 1);
@Test
public void testCompoundFileSettings() throws IOException {
IndexSettingsService service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
assertThat(new TieredMergePolicyProvider(createStore(EMPTY_SETTINGS), service).getMergePolicy().getNoCFSRatio(), equalTo(0.1));
assertThat(new TieredMergePolicyProvider(createStore(build(true)), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new TieredMergePolicyProvider(createStore(build(0.5)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.5));
assertThat(new TieredMergePolicyProvider(createStore(build(1.0)), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new TieredMergePolicyProvider(createStore(build("true")), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new TieredMergePolicyProvider(createStore(build("True")), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new TieredMergePolicyProvider(createStore(build("False")), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new TieredMergePolicyProvider(createStore(build("false")), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new TieredMergePolicyProvider(createStore(build(false)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new TieredMergePolicyProvider(createStore(build(0)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new TieredMergePolicyProvider(createStore(build(0.0)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new LogByteSizeMergePolicyProvider(createStore(EMPTY_SETTINGS), service).getMergePolicy().getNoCFSRatio(), equalTo(0.1));
assertThat(new LogByteSizeMergePolicyProvider(createStore(build(true)), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new LogByteSizeMergePolicyProvider(createStore(build(0.5)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.5));
assertThat(new LogByteSizeMergePolicyProvider(createStore(build(1.0)), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new LogByteSizeMergePolicyProvider(createStore(build("true")), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new LogByteSizeMergePolicyProvider(createStore(build("True")), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new LogByteSizeMergePolicyProvider(createStore(build("False")), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new LogByteSizeMergePolicyProvider(createStore(build("false")), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new LogByteSizeMergePolicyProvider(createStore(build(false)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new LogByteSizeMergePolicyProvider(createStore(build(0)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new LogByteSizeMergePolicyProvider(createStore(build(0.0)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new LogDocMergePolicyProvider(createStore(EMPTY_SETTINGS), service).getMergePolicy().getNoCFSRatio(), equalTo(0.1));
assertThat(new LogDocMergePolicyProvider(createStore(build(true)), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new LogDocMergePolicyProvider(createStore(build(0.5)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.5));
assertThat(new LogDocMergePolicyProvider(createStore(build(1.0)), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new LogDocMergePolicyProvider(createStore(build("true")), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new LogDocMergePolicyProvider(createStore(build("True")), service).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new LogDocMergePolicyProvider(createStore(build("False")), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new LogDocMergePolicyProvider(createStore(build("false")), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new LogDocMergePolicyProvider(createStore(build(false)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new LogDocMergePolicyProvider(createStore(build(0)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new LogDocMergePolicyProvider(createStore(build(0.0)), service).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
}
@Test
public void testInvalidValue() throws IOException {
IndexSettingsService service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
try {
new LogDocMergePolicyProvider(createStore(build(-0.1)), service).getMergePolicy().getNoCFSRatio();
fail("exception expected");
} catch (IllegalArgumentException ex) {
}
try {
new LogDocMergePolicyProvider(createStore(build(1.1)), service).getMergePolicy().getNoCFSRatio();
fail("exception expected");
} catch (IllegalArgumentException ex) {
}
try {
new LogDocMergePolicyProvider(createStore(build("Falsch")), service).getMergePolicy().getNoCFSRatio();
fail("exception expected");
} catch (IllegalArgumentException ex) {
}
}
@Test
public void testUpdateSettings() throws IOException {
{
IndexSettingsService service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
TieredMergePolicyProvider mp = new TieredMergePolicyProvider(createStore(EMPTY_SETTINGS), service);
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(0.1));
service.refreshSettings(build(1.0));
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(1.0));
service.refreshSettings(build(0.1));
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(0.1));
service.refreshSettings(build(0.0));
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(0.0));
}
{
IndexSettingsService service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
LogByteSizeMergePolicyProvider mp = new LogByteSizeMergePolicyProvider(createStore(EMPTY_SETTINGS), service);
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(0.1));
service.refreshSettings(build(1.0));
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(1.0));
service.refreshSettings(build(0.1));
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(0.1));
service.refreshSettings(build(0.0));
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(0.0));
}
{
IndexSettingsService service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
LogDocMergePolicyProvider mp = new LogDocMergePolicyProvider(createStore(EMPTY_SETTINGS), service);
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(0.1));
service.refreshSettings(build(1.0));
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(1.0));
service.refreshSettings(build(0.1));
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(0.1));
service.refreshSettings(build(0.0));
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(0.0));
}
}
public void testLogDocSizeMergePolicySettingsUpdate() throws IOException {
IndexSettingsService service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
LogDocMergePolicyProvider mp = new LogDocMergePolicyProvider(createStore(EMPTY_SETTINGS), service);
assertEquals(mp.getMergePolicy().getMaxMergeDocs(), LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS);
service.refreshSettings(Settings.builder().put(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS / 2).build());
assertEquals(mp.getMergePolicy().getMaxMergeDocs(), LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS / 2);
assertEquals(mp.getMergePolicy().getMinMergeDocs(), LogDocMergePolicy.DEFAULT_MIN_MERGE_DOCS);
service.refreshSettings(Settings.builder().put(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MIN_MERGE_DOCS, LogDocMergePolicy.DEFAULT_MIN_MERGE_DOCS / 2).build());
assertEquals(mp.getMergePolicy().getMinMergeDocs(), LogDocMergePolicy.DEFAULT_MIN_MERGE_DOCS / 2);
assertTrue(mp.getMergePolicy().getCalibrateSizeByDeletes());
service.refreshSettings(Settings.builder().put(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_CALIBRATE_SIZE_BY_DELETES, false).build());
assertFalse(mp.getMergePolicy().getCalibrateSizeByDeletes());
assertEquals(mp.getMergePolicy().getMergeFactor(), LogDocMergePolicy.DEFAULT_MERGE_FACTOR);
service.refreshSettings(Settings.builder().put(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, LogDocMergePolicy.DEFAULT_MERGE_FACTOR * 2).build());
assertEquals(mp.getMergePolicy().getMergeFactor(), LogDocMergePolicy.DEFAULT_MERGE_FACTOR * 2);
service.refreshSettings(EMPTY_SETTINGS); // update without the settings and see if we stick to the values
assertEquals(mp.getMergePolicy().getMaxMergeDocs(), LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS / 2);
assertEquals(mp.getMergePolicy().getMinMergeDocs(), LogDocMergePolicy.DEFAULT_MIN_MERGE_DOCS / 2);
assertFalse(mp.getMergePolicy().getCalibrateSizeByDeletes());
assertEquals(mp.getMergePolicy().getMergeFactor(), LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR * 2);
service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
mp = new LogDocMergePolicyProvider(createStore(Settings.builder()
.put(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS / 2)
.put(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR / 2)
.put(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_CALIBRATE_SIZE_BY_DELETES, false)
.put(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MIN_MERGE_DOCS, LogDocMergePolicy.DEFAULT_MIN_MERGE_DOCS - 1)
.build()), service);
assertEquals(mp.getMergePolicy().getMinMergeDocs(), LogDocMergePolicy.DEFAULT_MIN_MERGE_DOCS - 1);
assertFalse(mp.getMergePolicy().getCalibrateSizeByDeletes());
assertEquals(mp.getMergePolicy().getMergeFactor(), LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR / 2);
assertEquals(mp.getMergePolicy().getMaxMergeDocs(), LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS / 2);
}
public void testLogByteSizeMergePolicySettingsUpdate() throws IOException {
IndexSettingsService service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
LogByteSizeMergePolicyProvider mp = new LogByteSizeMergePolicyProvider(createStore(EMPTY_SETTINGS), service);
assertEquals(mp.getMergePolicy().getMaxMergeMB(), LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SIZE.mbFrac(), 0.0d);
service.refreshSettings(Settings.builder().put(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_SIZE, new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SIZE.mb() / 2, ByteSizeUnit.MB)).build());
assertEquals(mp.getMergePolicy().getMaxMergeMB(), new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SIZE.mb() / 2, ByteSizeUnit.MB).mbFrac(), 0.0d);
assertEquals(mp.getMergePolicy().getMinMergeMB(), LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE_SIZE.mbFrac(), 0.0d);
service.refreshSettings(Settings.builder().put(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MIN_MERGE_SIZE, new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE_SIZE.mb() + 1, ByteSizeUnit.MB)).build());
assertEquals(mp.getMergePolicy().getMinMergeMB(), new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE_SIZE.mb() + 1, ByteSizeUnit.MB).mbFrac(), 0.0d);
assertTrue(mp.getMergePolicy().getCalibrateSizeByDeletes());
service.refreshSettings(Settings.builder().put(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_CALIBRATE_SIZE_BY_DELETES, false).build());
assertFalse(mp.getMergePolicy().getCalibrateSizeByDeletes());
assertEquals(mp.getMergePolicy().getMergeFactor(), LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR);
service.refreshSettings(Settings.builder().put(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR / 2).build());
assertEquals(mp.getMergePolicy().getMergeFactor(), LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR / 2);
assertEquals(mp.getMergePolicy().getMaxMergeDocs(), LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS);
service.refreshSettings(Settings.builder().put(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS / 2).build());
assertEquals(mp.getMergePolicy().getMaxMergeDocs(), LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS / 2);
service.refreshSettings(EMPTY_SETTINGS); // update without the settings and see if we stick to the values
assertEquals(mp.getMergePolicy().getMaxMergeMB(), new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SIZE.mb() / 2, ByteSizeUnit.MB).mbFrac(), 0.0d);
assertEquals(mp.getMergePolicy().getMinMergeMB(), new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE_SIZE.mb() + 1, ByteSizeUnit.MB).mbFrac(), 0.0d);
assertFalse(mp.getMergePolicy().getCalibrateSizeByDeletes());
assertEquals(mp.getMergePolicy().getMergeFactor(), LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR / 2);
assertEquals(mp.getMergePolicy().getMaxMergeDocs(), LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS / 2);
service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
mp = new LogByteSizeMergePolicyProvider(createStore(Settings.builder()
.put(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS * 2)
.put(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR * 2)
.put(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_SIZE, new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SIZE.mb() / 2, ByteSizeUnit.MB))
.put(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_CALIBRATE_SIZE_BY_DELETES, false)
.put(LogByteSizeMergePolicyProvider.INDEX_MERGE_POLICY_MIN_MERGE_SIZE, new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE_SIZE.mb() + 1, ByteSizeUnit.MB))
.build()), service);
assertEquals(mp.getMergePolicy().getMaxMergeMB(), new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SIZE.mb() / 2, ByteSizeUnit.MB).mbFrac(), 0.0d);
assertEquals(mp.getMergePolicy().getMinMergeMB(), new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE_SIZE.mb() + 1, ByteSizeUnit.MB).mbFrac(), 0.0d);
assertFalse(mp.getMergePolicy().getCalibrateSizeByDeletes());
assertEquals(mp.getMergePolicy().getMergeFactor(), LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR * 2);
assertEquals(mp.getMergePolicy().getMaxMergeDocs(), LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS * 2);
}
public void testTieredMergePolicySettingsUpdate() throws IOException {
IndexSettingsService service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
TieredMergePolicyProvider mp = new TieredMergePolicyProvider(createStore(EMPTY_SETTINGS), service);
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(0.1));
assertEquals(mp.getMergePolicy().getForceMergeDeletesPctAllowed(), TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED, 0.0d);
service.refreshSettings(Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED, TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d).build());
assertEquals(mp.getMergePolicy().getForceMergeDeletesPctAllowed(), TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d, 0.0d);
assertEquals(mp.getMergePolicy().getFloorSegmentMB(), TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.mbFrac(), 0);
service.refreshSettings(Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_FLOOR_SEGMENT, new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.mb() + 1, ByteSizeUnit.MB)).build());
assertEquals(mp.getMergePolicy().getFloorSegmentMB(), new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.mb() + 1, ByteSizeUnit.MB).mbFrac(), 0.001);
assertEquals(mp.getMergePolicy().getMaxMergeAtOnce(), TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE);
service.refreshSettings(Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE -1 ).build());
assertEquals(mp.getMergePolicy().getMaxMergeAtOnce(), TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE-1);
assertEquals(mp.getMergePolicy().getMaxMergeAtOnceExplicit(), TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT);
service.refreshSettings(Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT, TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT -1 ).build());
assertEquals(mp.getMergePolicy().getMaxMergeAtOnceExplicit(), TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT-1);
assertEquals(mp.getMergePolicy().getMaxMergedSegmentMB(), TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.mbFrac(), 0.0001);
service.refreshSettings(Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT, new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.bytes() + 1)).build());
assertEquals(mp.getMergePolicy().getMaxMergedSegmentMB(), new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.bytes() + 1).mbFrac(), 0.0001);
assertEquals(mp.getMergePolicy().getReclaimDeletesWeight(), TieredMergePolicyProvider.DEFAULT_RECLAIM_DELETES_WEIGHT, 0);
service.refreshSettings(Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, TieredMergePolicyProvider.DEFAULT_RECLAIM_DELETES_WEIGHT + 1 ).build());
assertEquals(mp.getMergePolicy().getReclaimDeletesWeight(), TieredMergePolicyProvider.DEFAULT_RECLAIM_DELETES_WEIGHT + 1, 0);
assertEquals(mp.getMergePolicy().getSegmentsPerTier(), TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER, 0);
service.refreshSettings(Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER + 1 ).build());
assertEquals(mp.getMergePolicy().getSegmentsPerTier(), TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER + 1, 0);
service.refreshSettings(EMPTY_SETTINGS); // update without the settings and see if we stick to the values
assertEquals(mp.getMergePolicy().getForceMergeDeletesPctAllowed(), TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d, 0.0d);
assertEquals(mp.getMergePolicy().getFloorSegmentMB(), new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.mb() + 1, ByteSizeUnit.MB).mbFrac(), 0.001);
assertEquals(mp.getMergePolicy().getMaxMergeAtOnce(), TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE-1);
assertEquals(mp.getMergePolicy().getMaxMergeAtOnceExplicit(), TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT-1);
assertEquals(mp.getMergePolicy().getMaxMergedSegmentMB(), new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.bytes() + 1).mbFrac(), 0.0001);
assertEquals(mp.getMergePolicy().getReclaimDeletesWeight(), TieredMergePolicyProvider.DEFAULT_RECLAIM_DELETES_WEIGHT + 1, 0);
assertEquals(mp.getMergePolicy().getSegmentsPerTier(), TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER + 1, 0);
}
public Settings build(String value) {
return Settings.builder().put(AbstractMergePolicyProvider.INDEX_COMPOUND_FORMAT, value).build();
}
public Settings build(double value) {
return Settings.builder().put(AbstractMergePolicyProvider.INDEX_COMPOUND_FORMAT, value).build();
}
public Settings build(int value) {
return Settings.builder().put(AbstractMergePolicyProvider.INDEX_COMPOUND_FORMAT, value).build();
}
public Settings build(boolean value) {
return Settings.builder().put(AbstractMergePolicyProvider.INDEX_COMPOUND_FORMAT, value).build();
}
protected Store createStore(Settings settings) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) {
@Override
public Directory newDirectory() throws IOException {
return new RAMDirectory() ;
}
@Override
public long throttleTimeInNanos() {
return 0;
}
};
return new Store(shardId, settings, directoryService, new DummyShardLock(shardId));
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.IOException;
import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
import static org.hamcrest.Matchers.equalTo;
public class MergePolicySettingsTest extends ElasticsearchTestCase {
protected final ShardId shardId = new ShardId(new Index("index"), 1);
@Test
public void testCompoundFileSettings() throws IOException {
assertThat(new MergePolicyConfig(logger, EMPTY_SETTINGS).getMergePolicy().getNoCFSRatio(), equalTo(0.1));
assertThat(new MergePolicyConfig(logger, build(true)).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new MergePolicyConfig(logger, build(0.5)).getMergePolicy().getNoCFSRatio(), equalTo(0.5));
assertThat(new MergePolicyConfig(logger, build(1.0)).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new MergePolicyConfig(logger, build("true")).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new MergePolicyConfig(logger, build("True")).getMergePolicy().getNoCFSRatio(), equalTo(1.0));
assertThat(new MergePolicyConfig(logger, build("False")).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new MergePolicyConfig(logger, build("false")).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new MergePolicyConfig(logger, build(false)).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new MergePolicyConfig(logger, build(0)).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
assertThat(new MergePolicyConfig(logger, build(0.0)).getMergePolicy().getNoCFSRatio(), equalTo(0.0));
}
public void testNoMerges() {
MergePolicyConfig mp = new MergePolicyConfig(logger, Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build());
assertTrue(mp.getMergePolicy() instanceof NoMergePolicy);
}
@Test
public void testUpdateSettings() throws IOException {
{
IndexSettingsService service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
MergePolicyConfig mp = new MergePolicyConfig(logger, EMPTY_SETTINGS);
assertThat(((TieredMergePolicy) mp.getMergePolicy()).getNoCFSRatio(), equalTo(0.1));
mp.onRefreshSettings(build(1.0));
assertThat(((TieredMergePolicy) mp.getMergePolicy()).getNoCFSRatio(), equalTo(1.0));
mp.onRefreshSettings(build(0.1));
assertThat(((TieredMergePolicy) mp.getMergePolicy()).getNoCFSRatio(), equalTo(0.1));
mp.onRefreshSettings(build(0.0));
assertThat(((TieredMergePolicy) mp.getMergePolicy()).getNoCFSRatio(), equalTo(0.0));
}
}
public void testTieredMergePolicySettingsUpdate() throws IOException {
MergePolicyConfig mp = new MergePolicyConfig(logger, EMPTY_SETTINGS);
assertThat(mp.getMergePolicy().getNoCFSRatio(), equalTo(0.1));
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getForceMergeDeletesPctAllowed(), MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED, 0.0d);
mp.onRefreshSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED, MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d).build());
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getForceMergeDeletesPctAllowed(), MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d, 0.0d);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getFloorSegmentMB(), MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.mbFrac(), 0);
mp.onRefreshSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT, new ByteSizeValue(MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.mb() + 1, ByteSizeUnit.MB)).build());
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getFloorSegmentMB(), new ByteSizeValue(MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.mb() + 1, ByteSizeUnit.MB).mbFrac(), 0.001);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getMaxMergeAtOnce(), MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE);
mp.onRefreshSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE - 1).build());
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getMaxMergeAtOnce(), MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE-1);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getMaxMergeAtOnceExplicit(), MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT);
mp.onRefreshSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT, MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT - 1).build());
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getMaxMergeAtOnceExplicit(), MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT-1);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getMaxMergedSegmentMB(), MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.mbFrac(), 0.0001);
mp.onRefreshSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT, new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.bytes() + 1)).build());
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getMaxMergedSegmentMB(), new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.bytes() + 1).mbFrac(), 0.0001);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getReclaimDeletesWeight(), MergePolicyConfig.DEFAULT_RECLAIM_DELETES_WEIGHT, 0);
mp.onRefreshSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, MergePolicyConfig.DEFAULT_RECLAIM_DELETES_WEIGHT + 1).build());
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getReclaimDeletesWeight(), MergePolicyConfig.DEFAULT_RECLAIM_DELETES_WEIGHT + 1, 0);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getSegmentsPerTier(), MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER, 0);
mp.onRefreshSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER + 1).build());
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getSegmentsPerTier(), MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER + 1, 0);
mp.onRefreshSettings(EMPTY_SETTINGS); // update without the settings and see if we stick to the values
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getForceMergeDeletesPctAllowed(), MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d, 0.0d);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getFloorSegmentMB(), new ByteSizeValue(MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.mb() + 1, ByteSizeUnit.MB).mbFrac(), 0.001);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getMaxMergeAtOnce(), MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE-1);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getMaxMergeAtOnceExplicit(), MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE_EXPLICIT-1);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getMaxMergedSegmentMB(), new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.bytes() + 1).mbFrac(), 0.0001);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getReclaimDeletesWeight(), MergePolicyConfig.DEFAULT_RECLAIM_DELETES_WEIGHT + 1, 0);
assertEquals(((TieredMergePolicy) mp.getMergePolicy()).getSegmentsPerTier(), MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER + 1, 0);
}
public Settings build(String value) {
return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT, value).build();
}
public Settings build(double value) {
return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT, value).build();
}
public Settings build(int value) {
return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT, value).build();
}
public Settings build(boolean value) {
return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT, value).build();
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.merge.policy;
package org.elasticsearch.index.shard;
import org.apache.lucene.analysis.CannedTokenStream;
import org.apache.lucene.analysis.Token;
@ -36,6 +36,7 @@ import org.apache.lucene.util.TestUtil;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.index.shard.VersionFieldUpgrader;
import org.elasticsearch.test.ElasticsearchTestCase;
/** Tests upgrading old document versions from _uid payloads to _version docvalues */

View File

@ -51,12 +51,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
@ -67,7 +63,6 @@ import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.index.merge.NoMergePolicyProvider;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.*;
@ -127,7 +122,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
@ -233,7 +228,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
@ -464,7 +459,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
@ -520,7 +515,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(GatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS, "one")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)

View File

@ -31,8 +31,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -147,8 +147,8 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE, "none")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "2")
.put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL, 0) // get stats all the time - no caching
@ -319,8 +319,8 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "2")
.put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, "true")
@ -363,8 +363,8 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "10000")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
));

View File

@ -42,9 +42,9 @@ import org.elasticsearch.index.cache.filter.FilterCacheModule;
import org.elasticsearch.index.cache.filter.FilterCacheModule.FilterCacheSettings;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.filter.index.IndexFilterCache;
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.search.sort.SortOrder;
@ -285,8 +285,8 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
));
@ -318,8 +318,8 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "1")
.put("index.merge.policy.type", "tiered")

View File

@ -35,14 +35,12 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.index.merge.NoMergePolicyProvider;
import org.junit.Test;
import java.io.IOException;
@ -66,7 +64,7 @@ public class ParentFieldLoadingBwcTest extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexShard.INDEX_REFRESH_INTERVAL, -1)
// We never want merges in this test to ensure we have two segments for the last validation
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_1_6_0)
.build();

View File

@ -30,11 +30,10 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.index.merge.NoMergePolicyProvider;
import org.junit.Test;
import java.io.IOException;
@ -53,7 +52,7 @@ public class ParentFieldLoadingTest extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexShard.INDEX_REFRESH_INTERVAL, -1)
// We never want merges in this test to ensure we have two segments for the last validation
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.build();
@Test

View File

@ -101,8 +101,8 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappedFieldType.Loading;
import org.elasticsearch.index.mapper.internal.SizeFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.merge.policy.*;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogService;
@ -524,24 +524,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
private static Settings.Builder setRandomMerge(Random random, Settings.Builder builder) {
if (random.nextBoolean()) {
builder.put(AbstractMergePolicyProvider.INDEX_COMPOUND_FORMAT,
builder.put(MergePolicyConfig.INDEX_COMPOUND_FORMAT,
random.nextBoolean() ? random.nextDouble() : random.nextBoolean());
}
Class<? extends MergePolicyProvider<?>> mergePolicy = TieredMergePolicyProvider.class;
switch (random.nextInt(5)) {
case 4:
mergePolicy = LogByteSizeMergePolicyProvider.class;
break;
case 3:
mergePolicy = LogDocMergePolicyProvider.class;
break;
case 0:
mergePolicy = null;
}
if (mergePolicy != null) {
builder.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, mergePolicy.getName());
}
switch (random.nextInt(4)) {
case 3:
final int maxThreadCount = RandomInts.randomIntBetween(random, 1, 4);

View File

@ -1,46 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.index.merge;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider;
import org.elasticsearch.index.store.Store;
/**
* {@link org.elasticsearch.index.merge.policy.MergePolicyProvider} for lucenes {@link org.apache.lucene.index.NoMergePolicy}
*/
public class NoMergePolicyProvider extends AbstractMergePolicyProvider<MergePolicy> {
@Inject
public NoMergePolicyProvider(Store store) {
super(store);
}
@Override
public MergePolicy getMergePolicy() {
return NoMergePolicy.INSTANCE;
}
@Override
public void close() {}
}

View File

@ -36,11 +36,10 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.index.merge.NoMergePolicyProvider;
import org.junit.Test;
import java.util.ArrayList;
@ -639,7 +638,7 @@ public class UpdateTests extends ElasticsearchIntegrationTest {
.startObject("_ttl").field("enabled", true).endObject()
.endObject()
.endObject())
.setSettings(Settings.builder().put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)));
.setSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)));
ensureGreen();
final int numberOfThreads = scaledRandomIntBetween(3,5);
@ -1374,7 +1373,7 @@ public class UpdateTests extends ElasticsearchIntegrationTest {
"type1",
XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_timestamp").field("enabled", true)
.field("store", "yes").endObject().startObject("_ttl").field("enabled", true).endObject().endObject().endObject())
.setSettings(Settings.builder().put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)));
.setSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)));
ensureGreen();
final int numberOfThreads = scaledRandomIntBetween(3, 5);

View File

@ -9,26 +9,12 @@ where the index data is stored, and are immutable up to delete markers.
Segments are, periodically, merged into larger segments to keep the
index size at bay and expunge deletes.
[float]
[[policy]]
=== Policy
The index merge policy module allows one to control which segments of a
shard index are to be merged. There are several types of policies with
the default set to `tiered`.
[float]
[[tiered]]
==== tiered
Merges segments of approximately equal size, subject to an allowed
number of segments per tier. This is similar to `log_bytes_size` merge
policy, except this merge policy is able to merge non-adjacent segment,
and separates how many segments are merged at once from how many
segments are allowed per tier. This merge policy also does not
over-merge (i.e., cascade merges).
number of segments per tier. The merge policy is able to merge
non-adjacent segments, and separates how many segments are merged at once from how many
segments are allowed per tier. It also does not over-merge (i.e., cascade merges).
This policy has the following settings:
The merge policy has the following settings:
`index.merge.policy.expunge_deletes_allowed`::
@ -72,7 +58,7 @@ This policy has the following settings:
Higher values favor selecting merges that reclaim deletions. A value of
`0.0` means deletions don't impact merge selection. Defaults to `2.0`.
For normal merging, this policy first computes a "budget" of how many
For normal merging, the policy first computes a "budget" of how many
segments are allowed to be in the index. If the index is over-budget,
then the policy sorts segments by decreasing size (proportionally considering percent
deletes), and then finds the least-cost merge. Merge cost is measured by
@ -92,83 +78,6 @@ segments to be in an index, and causing searches to be slower. Use the
indices segments API to see the segments that an index has, and
possibly either increase the `max_merged_segment` or issue an optimize
call for the index (try and aim to issue it on a low traffic time).
[float]
[[log-byte-size]]
==== log_byte_size
A merge policy that merges segments into levels of exponentially
increasing *byte size*, where each level has fewer segments than the
value of the merge factor. Whenever extra segments (beyond the merge
factor upper bound) are encountered, all segments within the level are
merged.
This policy has the following settings:
[cols="<,<",options="header",]
|=======================================================================
|Setting |Description
|index.merge.policy.merge_factor |Determines how often segment indices
are merged by index operation. With smaller values, less RAM is used
while indexing, and searches on unoptimized indices are faster, but
indexing speed is slower. With larger values, more RAM is used during
indexing, and while searches on unoptimized indices are slower, indexing
is faster. Thus larger values (greater than 10) are best for batch index
creation, and smaller values (lower than 10) for indices that are
interactively maintained. Defaults to `10`.
|index.merge.policy.min_merge_size |A size setting type which sets the
minimum size for the lowest level segments. Any segments below this size
are considered to be on the same level (even if they vary drastically in
size) and will be merged whenever there are mergeFactor of them. This
effectively truncates the "long tail" of small segments that would
otherwise be created into a single level. If you set this too large, it
could greatly increase the merging cost during indexing (if you flush
many small segments). Defaults to `1.6mb`
|index.merge.policy.max_merge_size |A size setting type which sets the
largest segment (measured by total byte size of the segment's files)
that may be merged with other segments. Defaults to unbounded.
|index.merge.policy.max_merge_docs |Determines the largest segment
(measured by document count) that may be merged with other segments.
Defaults to unbounded.
|=======================================================================
[float]
[[log-doc]]
==== log_doc
A merge policy that tries to merge segments into levels of exponentially
increasing *document count*, where each level has fewer segments than
the value of the merge factor. Whenever extra segments (beyond the merge
factor upper bound) are encountered, all segments within the level are
merged.
[cols="<,<",options="header",]
|=======================================================================
|Setting |Description
|index.merge.policy.merge_factor |Determines how often segment indices
are merged by index operation. With smaller values, less RAM is used
while indexing, and searches on unoptimized indices are faster, but
indexing speed is slower. With larger values, more RAM is used during
indexing, and while searches on unoptimized indices are slower, indexing
is faster. Thus larger values (greater than 10) are best for batch index
creation, and smaller values (lower than 10) for indices that are
interactively maintained. Defaults to `10`.
|index.merge.policy.min_merge_docs |Sets the minimum size for the lowest
level segments. Any segments below this size are considered to be on the
same level (even if they vary drastically in size) and will be merged
whenever there are mergeFactor of them. This effectively truncates the
"long tail" of small segments that would otherwise be created into a
single level. If you set this too large, it could greatly increase the
merging cost during indexing (if you flush many small segments).
Defaults to `1000`.
|index.merge.policy.max_merge_docs |Determines the largest segment
(measured by document count) that may be merged with other segments.
Defaults to unbounded.
|=======================================================================
[float]