Expose 'index.compound_on_flush' via engine settings

Lucene 4.4 shipped with a fundamental change in how the decision
on when to write compound files is made. During segment flush the
compound files are written by default which solely relies on a flag
in the IndexWriterConfig. The merge policy has been factored out to
only make decisions on merges and not on IW flushes. The default now
is always writing CFS on flush to reduce resource usage like open files
etc. if segments are flushed regularly. While providing a senseable
default certain users / usecases might need to change this setting if
re-packing flushed segments into CFS is not desired.

Closes #3461
This commit is contained in:
Simon Willnauer 2013-08-08 10:37:26 +02:00
parent 04b23a8fab
commit 4e2b9ff2ad
3 changed files with 81 additions and 59 deletions

View File

@ -91,6 +91,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private volatile int termIndexInterval;
private volatile int termIndexDivisor;
private volatile int indexConcurrency;
private volatile boolean compoundOnFlush = true;
private long gcDeletesInMillis;
private volatile boolean enableGcDeletes = true;
private volatile String codecName;
@ -183,7 +185,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
this.analysisService = analysisService;
this.similarityService = similarityService;
this.codecService = codecService;
this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, this.compoundOnFlush);
this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
this.versionMap = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
@ -1292,6 +1294,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
config.setReaderTermsIndexDivisor(termIndexDivisor);
config.setMaxThreadStates(indexConcurrency);
config.setCodec(codecService.codec(codecName));
config.setUseCompoundFile(this.compoundOnFlush);
indexWriter = new IndexWriter(store.directory(), config);
} catch (IOException e) {
safeClose(indexWriter);
@ -1303,10 +1306,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
public static final String INDEX_TERM_INDEX_INTERVAL = "index.term_index_interval";
public static final String INDEX_TERM_INDEX_DIVISOR = "index.term_index_divisor";
public static final String INDEX_INDEX_CONCURRENCY = "index.index_concurrency";
public static final String INDEX_COMPOUND_ON_FLUSH = "index.compound_on_flush";
public static final String INDEX_GC_DELETES = "index.gc_deletes";
public static final String INDEX_FAIL_ON_MERGE_FAILURE = "index.fail_on_merge_failure";
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(RobinEngine.this.gcDeletesInMillis)).millis();
@ -1315,6 +1320,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
RobinEngine.this.gcDeletesInMillis = gcDeletesInMillis;
}
final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, RobinEngine.this.compoundOnFlush);
if (compoundOnFlush != RobinEngine.this.compoundOnFlush) {
logger.info("updating {} from [{}] to [{}]", RobinEngine.INDEX_COMPOUND_ON_FLUSH, RobinEngine.this.compoundOnFlush, compoundOnFlush);
RobinEngine.this.compoundOnFlush = compoundOnFlush;
indexWriter.getConfig().setUseCompoundFile(compoundOnFlush);
}
int termIndexInterval = settings.getAsInt(INDEX_TERM_INDEX_INTERVAL, RobinEngine.this.termIndexInterval);
int termIndexDivisor = settings.getAsInt(INDEX_TERM_INDEX_DIVISOR, RobinEngine.this.termIndexDivisor); // IndexReader#DEFAULT_TERMS_INDEX_DIVISOR
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, RobinEngine.this.indexConcurrency);

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.test.unit.index.engine;
package org.elasticsearch.test.unit.index.engine.robin;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
@ -30,13 +30,19 @@ import org.apache.lucene.search.TermQuery;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.engine.robin.RobinEngine;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
@ -46,13 +52,16 @@ import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.store.ram.RamDirectoryService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.test.integration.ElasticsearchTestCase;
import org.elasticsearch.test.unit.index.deletionpolicy.SnapshotIndexCommitExistsMatcher;
import org.elasticsearch.test.unit.index.engine.EngineSearcherTotalHitsMatcher;
import org.elasticsearch.test.unit.index.translog.TranslogSizeMatcher;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.MatcherAssert;
@ -71,15 +80,12 @@ import java.util.concurrent.Future;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
*
*/
public abstract class AbstractSimpleEngineTests {
public class RobinEngineTests extends ElasticsearchTestCase {
protected final ShardId shardId = new ShardId(new Index("index"), 1);
@ -91,16 +97,28 @@ public abstract class AbstractSimpleEngineTests {
protected Engine engine;
protected Engine replicaEngine;
private IndexSettingsService engineSettingsService;
private IndexSettingsService replicaSettingsService;
private Settings defaultSettings;
@Before
public void setUp() throws Exception {
defaultSettings = ImmutableSettings.builder()
.put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, getRandom().nextBoolean())
.build(); // TODO randomize more settings
threadPool = new ThreadPool();
store = createStore();
store.deleteContent();
storeReplica = createStoreReplica();
storeReplica.deleteContent();
engine = createEngine(store, createTranslog());
engineSettingsService = new IndexSettingsService(shardId.index(), EMPTY_SETTINGS);
engine = createEngine(engineSettingsService, store, createTranslog());
engine.start();
replicaEngine = createEngine(storeReplica, createTranslogReplica());
replicaSettingsService = new IndexSettingsService(shardId.index(), EMPTY_SETTINGS);
replicaEngine = createEngine(replicaSettingsService, storeReplica, createTranslogReplica());
replicaEngine.start();
}
@ -170,8 +188,10 @@ public abstract class AbstractSimpleEngineTests {
return new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool);
}
protected abstract Engine createEngine(Store store, Translog translog);
protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog) {
return new RobinEngine(shardId, defaultSettings, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new CodecService(shardId.index()));
}
protected static final BytesReference B_1 = new BytesArray(new byte[]{1});
protected static final BytesReference B_2 = new BytesArray(new byte[]{2});
protected static final BytesReference B_3 = new BytesArray(new byte[]{3});
@ -180,6 +200,7 @@ public abstract class AbstractSimpleEngineTests {
public void testSegments() throws Exception {
List<Segment> segments = engine.segments();
assertThat(segments.isEmpty(), equalTo(true));
final boolean defaultCompound = defaultSettings.getAsBoolean(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true);
// create a doc and refresh
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
@ -195,6 +216,7 @@ public abstract class AbstractSimpleEngineTests {
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(2));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
engine.flush(new Engine.Flush());
@ -204,7 +226,9 @@ public abstract class AbstractSimpleEngineTests {
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(2));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, false).build());
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3));
@ -217,11 +241,15 @@ public abstract class AbstractSimpleEngineTests {
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(2));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
assertThat(segments.get(1).isCommitted(), equalTo(false));
assertThat(segments.get(1).isSearch(), equalTo(true));
assertThat(segments.get(1).getNumDocs(), equalTo(1));
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(false));
engine.delete(new Engine.Delete("test", "1", newUid("1")));
engine.refresh(new Engine.Refresh(true));
@ -233,13 +261,41 @@ public abstract class AbstractSimpleEngineTests {
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(1));
assertThat(segments.get(0).getDeletedDocs(), equalTo(1));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
assertThat(segments.get(1).isCommitted(), equalTo(false));
assertThat(segments.get(1).isSearch(), equalTo(true));
assertThat(segments.get(1).getNumDocs(), equalTo(1));
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
}
assertThat(segments.get(1).isCompound(), equalTo(false));
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true).build());
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("4"), doc4));
engine.refresh(new Engine.Refresh(true));
segments = engine.segments();
assertThat(segments.size(), equalTo(3));
assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true));
assertThat(segments.get(0).isCommitted(), equalTo(true));
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(1));
assertThat(segments.get(0).getDeletedDocs(), equalTo(1));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
assertThat(segments.get(1).isCommitted(), equalTo(false));
assertThat(segments.get(1).isSearch(), equalTo(true));
assertThat(segments.get(1).getNumDocs(), equalTo(1));
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(false));
assertThat(segments.get(2).isCommitted(), equalTo(false));
assertThat(segments.get(2).isSearch(), equalTo(true));
assertThat(segments.get(2).getNumDocs(), equalTo(1));
assertThat(segments.get(2).getDeletedDocs(), equalTo(0));
assertThat(segments.get(2).isCompound(), equalTo(true));
}
@Test
public void testSimpleOperations() throws Exception {
Engine.Searcher searchResult = engine.searcher();

View File

@ -1,46 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.index.engine.robin;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.robin.RobinEngine;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.unit.index.engine.AbstractSimpleEngineTests;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
/**
*
*/
public class SimpleRobinEngineTests extends AbstractSimpleEngineTests {
protected Engine createEngine(Store store, Translog translog) {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), EMPTY_SETTINGS);
return new RobinEngine(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new CodecService(shardId.index()));
}
}