Force merges to not happen when indexing a doc / flush

Today, even though our merge policy doesn't return new merge specs on SEGMENT_FLUSH, merge on the scheduler is still called on flush time, and can cause merges to stall indexing during merges. Both for the concurrent merge scheduler (the default) and the serial merge scheduler. This behavior become worse when throttling kicks in (today at 20mb per sec).

In order to solve it (outside of Lucene for now), we wrap the merge scheduler with an EnableMergeScheduler, where, on the thread level, using a thread local, the call to merge can be enabled/disabled.

A Merges helper class is added where all explicit merges operations should go through. If the scheduler is the enabled one, it will enable merges before calling the relevant explicit method call. In order to make sure Merges is the only class that calls the explicit merge calls, the IW variant of them is added to the forbidden APIs list.

closes #5319
This commit is contained in:
Shay Banon 2014-03-05 12:26:26 +00:00
parent 23471cd72c
commit 992747a159
17 changed files with 275 additions and 33 deletions

View File

@ -24,3 +24,9 @@ org.apache.lucene.util.RamUsageEstimator#sizeOf(java.lang.Object) @ This can be
org.apache.lucene.index.IndexReader#decRef() org.apache.lucene.index.IndexReader#decRef()
org.apache.lucene.index.IndexReader#incRef() org.apache.lucene.index.IndexReader#incRef()
org.apache.lucene.index.IndexReader#tryIncRef() org.apache.lucene.index.IndexReader#tryIncRef()
org.apache.lucene.index.IndexWriter#maybeMerge() @ use Merges#maybeMerge
org.apache.lucene.index.IndexWriter#forceMerge(int) @ use Merges#forceMerge
org.apache.lucene.index.IndexWriter#forceMerge(int,boolean) @ use Merges#forceMerge
org.apache.lucene.index.IndexWriter#forceMergeDeletes() @ use Merges#forceMergeDeletes
org.apache.lucene.index.IndexWriter#forceMergeDeletes(boolean) @ use Merges#forceMergeDeletes

View File

@ -990,6 +990,7 @@
<exclude>org/apache/lucene/search/XReferenceManager.class</exclude> <exclude>org/apache/lucene/search/XReferenceManager.class</exclude>
<exclude>org/apache/lucene/search/XSearcherManager.class</exclude> <exclude>org/apache/lucene/search/XSearcherManager.class</exclude>
<exclude>org/elasticsearch/index/percolator/stats/ShardPercolateService$RamEstimator.class</exclude> <exclude>org/elasticsearch/index/percolator/stats/ShardPercolateService$RamEstimator.class</exclude>
<exclude>org/elasticsearch/index/merge/Merges.class</exclude>
<!-- end excludes for valid system-out --> <!-- end excludes for valid system-out -->
<!-- start excludes for Unsafe --> <!-- start excludes for Unsafe -->
<exclude>org/elasticsearch/common/util/UnsafeUtils.class</exclude> <exclude>org/elasticsearch/common/util/UnsafeUtils.class</exclude>

View File

@ -56,6 +56,7 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*; import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.Merges;
import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy; import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy;
import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyProvider;
@ -947,7 +948,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
rwl.readLock().lock(); rwl.readLock().lock();
try { try {
ensureOpen(); ensureOpen();
indexWriter.maybeMerge(); Merges.maybeMerge(indexWriter);
} catch (OutOfMemoryError e) { } catch (OutOfMemoryError e) {
failEngine(e); failEngine(e);
throw new OptimizeFailedEngineException(shardId, e); throw new OptimizeFailedEngineException(shardId, e);
@ -973,12 +974,12 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
try { try {
ensureOpen(); ensureOpen();
if (optimize.onlyExpungeDeletes()) { if (optimize.onlyExpungeDeletes()) {
indexWriter.forceMergeDeletes(false); Merges.forceMergeDeletes(indexWriter, false);
} else if (optimize.maxNumSegments() <= 0) { } else if (optimize.maxNumSegments() <= 0) {
indexWriter.maybeMerge(); Merges.maybeMerge(indexWriter);
possibleMergeNeeded = false; possibleMergeNeeded = false;
} else { } else {
indexWriter.forceMerge(optimize.maxNumSegments(), false); Merges.forceMerge(indexWriter, optimize.maxNumSegments(), false);
} }
} catch (OutOfMemoryError e) { } catch (OutOfMemoryError e) {
failEngine(e); failEngine(e);

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergeScheduler;
import java.io.IOException;
/**
* A wrapper of another {@link org.apache.lucene.index.MergeScheduler} that allows
* to explicitly enable merge and disable on a thread local basis. The default is
* to have merges disabled.
* <p/>
* This merge scheduler can be used to get around the fact that even though a merge
* policy can control that no new merges will be created as a result of a segment flush
* (during indexing operation for example), the {@link #merge(org.apache.lucene.index.IndexWriter)}
* call will still be called, and can result in stalling indexing.
*/
public class EnableMergeScheduler extends MergeScheduler {
private final MergeScheduler mergeScheduler;
private final ThreadLocal<Boolean> enabled = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.FALSE;
}
};
public EnableMergeScheduler(MergeScheduler mergeScheduler) {
this.mergeScheduler = mergeScheduler;
}
/**
* Enable merges on the current thread.
*/
void enableMerge() {
assert !enabled.get();
enabled.set(Boolean.TRUE);
}
/**
* Disable merges on the current thread.
*/
void disableMerge() {
assert enabled.get();
enabled.set(Boolean.FALSE);
}
@Override
public void merge(IndexWriter writer) throws IOException {
if (enabled.get()) {
mergeScheduler.merge(writer);
}
}
@Override
public void close() throws IOException {
mergeScheduler.close();
}
@Override
public MergeScheduler clone() {
// Lucene IW makes a clone internally but since we hold on to this instance
// the clone will just be the identity.
return this;
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergeScheduler;
import java.io.IOException;
/**
* A helper to execute explicit merges of the {@link org.apache.lucene.index.IndexWriter} APIs. It
* holds additional logic which in case the merge scheduler is an {@link org.elasticsearch.index.merge.EnableMergeScheduler}
* then merges are explicitly enabled and disabled back at the end.
* <p/>
* In our codebase, at least until we can somehow use this logic in Lucene IW itself, we should only use
* this class to execute explicit merges. The explicit merge calls have been added to the forbidden APIs
* list to make sure we don't call them unless we use this class.
*/
public class Merges {
/**
* See {@link org.apache.lucene.index.IndexWriter#maybeMerge()}, with the additional
* logic of explicitly enabling merges if the scheduler is {@link org.elasticsearch.index.merge.EnableMergeScheduler}.
*/
public static void maybeMerge(IndexWriter writer) throws IOException {
MergeScheduler mergeScheduler = writer.getConfig().getMergeScheduler();
if (mergeScheduler instanceof EnableMergeScheduler) {
((EnableMergeScheduler) mergeScheduler).enableMerge();
try {
writer.maybeMerge();
} finally {
((EnableMergeScheduler) mergeScheduler).disableMerge();
}
} else {
writer.maybeMerge();
}
}
/**
* See {@link org.apache.lucene.index.IndexWriter#forceMerge(int)}, with the additional
* logic of explicitly enabling merges if the scheduler is {@link org.elasticsearch.index.merge.EnableMergeScheduler}.
*/
public static void forceMerge(IndexWriter writer, int maxNumSegments) throws IOException {
forceMerge(writer, maxNumSegments, true);
}
/**
* See {@link org.apache.lucene.index.IndexWriter#forceMerge(int, boolean)}, with the additional
* logic of explicitly enabling merges if the scheduler is {@link org.elasticsearch.index.merge.EnableMergeScheduler}.
*/
public static void forceMerge(IndexWriter writer, int maxNumSegments, boolean doWait) throws IOException {
MergeScheduler mergeScheduler = writer.getConfig().getMergeScheduler();
if (mergeScheduler instanceof EnableMergeScheduler) {
((EnableMergeScheduler) mergeScheduler).enableMerge();
try {
writer.forceMerge(maxNumSegments, doWait);
} finally {
((EnableMergeScheduler) mergeScheduler).disableMerge();
}
} else {
writer.forceMerge(maxNumSegments, doWait);
}
}
/**
* See {@link org.apache.lucene.index.IndexWriter#forceMergeDeletes()}, with the additional
* logic of explicitly enabling merges if the scheduler is {@link org.elasticsearch.index.merge.EnableMergeScheduler}.
*/
public static void forceMergeDeletes(IndexWriter writer) throws IOException {
forceMergeDeletes(writer, true);
}
/**
* See {@link org.apache.lucene.index.IndexWriter#forceMergeDeletes(boolean)}, with the additional
* logic of explicitly enabling merges if the scheduler is {@link org.elasticsearch.index.merge.EnableMergeScheduler}.
*/
public static void forceMergeDeletes(IndexWriter writer, boolean doWait) throws IOException {
MergeScheduler mergeScheduler = writer.getConfig().getMergeScheduler();
if (mergeScheduler instanceof EnableMergeScheduler) {
((EnableMergeScheduler) mergeScheduler).enableMerge();
try {
writer.forceMergeDeletes(doWait);
} finally {
((EnableMergeScheduler) mergeScheduler).disableMerge();
}
} else {
writer.forceMergeDeletes(doWait);
}
}
}

View File

@ -59,7 +59,7 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
} }
@Override @Override
public MergeScheduler newMergeScheduler() { public MergeScheduler buildMergeScheduler() {
CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this); CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this);
concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount, maxThreadCount); concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
schedulers.add(concurrentMergeScheduler); schedulers.add(concurrentMergeScheduler);

View File

@ -22,16 +22,13 @@ package org.elasticsearch.index.merge.scheduler;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import static org.elasticsearch.index.merge.scheduler.MergeSchedulerModule.MergeSchedulerSettings.TYPE;
/** /**
* *
*/ */
public class MergeSchedulerModule extends AbstractModule { public class MergeSchedulerModule extends AbstractModule {
public static class MergeSchedulerSettings { public static final String MERGE_SCHEDULER_TYPE_KEY = "index.merge.scheduler.type";
public static final String TYPE = "index.merge.scheduler.type"; public static final Class<? extends MergeSchedulerProvider> DEFAULT = ConcurrentMergeSchedulerProvider.class;
}
private final Settings settings; private final Settings settings;
@ -42,7 +39,7 @@ public class MergeSchedulerModule extends AbstractModule {
@Override @Override
protected void configure() { protected void configure() {
bind(MergeSchedulerProvider.class) bind(MergeSchedulerProvider.class)
.to(settings.getAsClass(TYPE, ConcurrentMergeSchedulerProvider.class, "org.elasticsearch.index.merge.scheduler.", "MergeSchedulerProvider")) .to(settings.getAsClass(MERGE_SCHEDULER_TYPE_KEY, DEFAULT, "org.elasticsearch.index.merge.scheduler.", "MergeSchedulerProvider"))
.asEagerSingleton(); .asEagerSingleton();
} }
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.merge.scheduler;
import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler; import org.apache.lucene.index.MergeScheduler;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.merge.EnableMergeScheduler;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
@ -36,7 +37,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
/** /**
* *
*/ */
public abstract class MergeSchedulerProvider<T extends MergeScheduler> extends AbstractIndexShardComponent implements IndexShardComponent { public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent {
public static final String FORCE_ASYNC_MERGE = "index.merge.force_async_merge";
public static interface FailureListener { public static interface FailureListener {
void onFailedMerge(MergePolicy.MergeException e); void onFailedMerge(MergePolicy.MergeException e);
@ -110,7 +113,16 @@ public abstract class MergeSchedulerProvider<T extends MergeScheduler> extends A
} }
} }
public abstract T newMergeScheduler(); public final MergeScheduler newMergeScheduler() {
MergeScheduler scheduler = buildMergeScheduler();
// an internal settings, that would allow us to disable this behavior if really needed
if (indexSettings.getAsBoolean(FORCE_ASYNC_MERGE, true)) {
scheduler = new EnableMergeScheduler(scheduler);
}
return scheduler;
}
protected abstract MergeScheduler buildMergeScheduler();
public abstract MergeStats stats(); public abstract MergeStats stats();

View File

@ -48,7 +48,7 @@ public class SerialMergeSchedulerProvider extends MergeSchedulerProvider {
} }
@Override @Override
public MergeScheduler newMergeScheduler() { public MergeScheduler buildMergeScheduler() {
CustomSerialMergeScheduler scheduler = new CustomSerialMergeScheduler(logger, this); CustomSerialMergeScheduler scheduler = new CustomSerialMergeScheduler(logger, this);
schedulers.add(scheduler); schedulers.add(scheduler);
return scheduler; return scheduler;

View File

@ -37,6 +37,7 @@ import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.core.LongFieldMapper; import org.elasticsearch.index.mapper.core.LongFieldMapper;
import org.elasticsearch.index.merge.Merges;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import java.util.Random; import java.util.Random;
@ -51,6 +52,7 @@ public class LongFieldDataBenchmark {
public int numValues() { public int numValues() {
return 1; return 1;
} }
@Override @Override
public long nextValue() { public long nextValue() {
return RANDOM.nextInt(16); return RANDOM.nextInt(16);
@ -60,6 +62,7 @@ public class LongFieldDataBenchmark {
public int numValues() { public int numValues() {
return 1; return 1;
} }
@Override @Override
public long nextValue() { public long nextValue() {
// somewhere in-between 2010 and 2012 // somewhere in-between 2010 and 2012
@ -70,6 +73,7 @@ public class LongFieldDataBenchmark {
public int numValues() { public int numValues() {
return RANDOM.nextInt(3); return RANDOM.nextInt(3);
} }
@Override @Override
public long nextValue() { public long nextValue() {
// somewhere in-between 2010 and 2012 // somewhere in-between 2010 and 2012
@ -80,6 +84,7 @@ public class LongFieldDataBenchmark {
public int numValues() { public int numValues() {
return RANDOM.nextInt(3); return RANDOM.nextInt(3);
} }
@Override @Override
public long nextValue() { public long nextValue() {
return 3 + RANDOM.nextInt(8); return 3 + RANDOM.nextInt(8);
@ -89,6 +94,7 @@ public class LongFieldDataBenchmark {
public int numValues() { public int numValues() {
return RANDOM.nextFloat() < 0.1f ? 1 : 0; return RANDOM.nextFloat() < 0.1f ? 1 : 0;
} }
@Override @Override
public long nextValue() { public long nextValue() {
return RANDOM.nextLong(); return RANDOM.nextLong();
@ -98,6 +104,7 @@ public class LongFieldDataBenchmark {
public int numValues() { public int numValues() {
return RANDOM.nextFloat() < 0.1f ? 1 + RANDOM.nextInt(5) : 0; return RANDOM.nextFloat() < 0.1f ? 1 + RANDOM.nextInt(5) : 0;
} }
@Override @Override
public long nextValue() { public long nextValue() {
return RANDOM.nextLong(); return RANDOM.nextLong();
@ -107,12 +114,15 @@ public class LongFieldDataBenchmark {
public int numValues() { public int numValues() {
return 1 + RANDOM.nextInt(3); return 1 + RANDOM.nextInt(3);
} }
@Override @Override
public long nextValue() { public long nextValue() {
return RANDOM.nextLong(); return RANDOM.nextLong();
} }
}; };
public abstract int numValues(); public abstract int numValues();
public abstract long nextValue(); public abstract long nextValue();
} }
@ -132,7 +142,7 @@ public class LongFieldDataBenchmark {
} }
indexWriter.addDocument(doc); indexWriter.addDocument(doc);
} }
indexWriter.forceMerge(1); Merges.forceMerge(indexWriter, 1);
indexWriter.close(); indexWriter.close();
final DirectoryReader dr = DirectoryReader.open(dir); final DirectoryReader dr = DirectoryReader.open(dir);

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper; import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.index.merge.Merges;
import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy; import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy;
import org.elasticsearch.test.ElasticsearchLuceneTestCase; import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
@ -266,7 +267,7 @@ public class VersionsTests extends ElasticsearchLuceneTestCase {
.put("1", 0L).put("2", 0L).put("3", 0L).put("4", 4L).put("5", 5L).put("6", 6L).build(); .put("1", 0L).put("2", 0L).put("3", 0L).put("4", 4L).put("5", 5L).put("6", 6L).build();
// Force merge and check versions // Force merge and check versions
iw.forceMerge(1); Merges.forceMerge(iw, 1);
final AtomicReader ir = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(iw.getDirectory())); final AtomicReader ir = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(iw.getDirectory()));
final NumericDocValues versions = ir.getNumericDocValues(VersionFieldMapper.NAME); final NumericDocValues versions = ir.getNumericDocValues(VersionFieldMapper.NAME);
assertThat(versions, notNullValue()); assertThat(versions, notNullValue());

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat; import org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat;
import org.elasticsearch.index.codec.postingsformat.Elasticsearch090PostingsFormat; import org.elasticsearch.index.codec.postingsformat.Elasticsearch090PostingsFormat;
import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.merge.Merges;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test; import org.junit.Test;
@ -93,7 +94,7 @@ public class DefaultPostingsFormatTests extends ElasticsearchTestCase {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
writer.addDocument(Arrays.asList(new TextField("foo", "foo bar foo bar", Store.YES), new TextField("some_other_field", "1234", Store.YES))); writer.addDocument(Arrays.asList(new TextField("foo", "foo bar foo bar", Store.YES), new TextField("some_other_field", "1234", Store.YES)));
} }
writer.forceMerge(1); Merges.forceMerge(writer, 1);
writer.commit(); writer.commit();
DirectoryReader reader = DirectoryReader.open(writer, false); DirectoryReader reader = DirectoryReader.open(writer, false);

View File

@ -185,7 +185,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
return new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(new Index("test"), EMPTY_SETTINGS)); return new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(new Index("test"), EMPTY_SETTINGS));
} }
protected MergeSchedulerProvider<?> createMergeScheduler() { protected MergeSchedulerProvider createMergeScheduler() {
return new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool); return new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool);
} }
@ -193,7 +193,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
return createEngine(indexSettingsService, store, translog, createMergeScheduler()); return createEngine(indexSettingsService, store, translog, createMergeScheduler());
} }
protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider<?> mergeSchedulerProvider) { protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
return new InternalEngine(shardId, defaultSettings, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider, return new InternalEngine(shardId, defaultSettings, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new CodecService(shardId.index())); new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new CodecService(shardId.index()));
} }

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.fielddata.AtomicFieldData.WithOrdinals; import org.elasticsearch.index.fielddata.AtomicFieldData.WithOrdinals;
import org.elasticsearch.index.fielddata.ScriptDocValues.Strings; import org.elasticsearch.index.fielddata.ScriptDocValues.Strings;
import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs; import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs;
import org.elasticsearch.index.merge.Merges;
import org.junit.Test; import org.junit.Test;
import java.util.Random; import java.util.Random;
@ -60,7 +61,7 @@ public class FilterFieldDataTest extends AbstractFieldDataTests {
} }
writer.addDocument(d); writer.addDocument(d);
} }
writer.forceMerge(1); Merges.forceMerge(writer, 1);
AtomicReaderContext context = refreshReader(); AtomicReaderContext context = refreshReader();
String[] formats = new String[] { "fst", "paged_bytes"}; String[] formats = new String[] { "fst", "paged_bytes"};
@ -162,8 +163,8 @@ public class FilterFieldDataTest extends AbstractFieldDataTests {
} }
writer.addDocument(d); writer.addDocument(d);
} }
System.out.println(hundred + " " + ten + " " +five); System.out.println(hundred + " " + ten + " " + five);
writer.forceMerge(1); Merges.forceMerge(writer, 1);
AtomicReaderContext context = refreshReader(); AtomicReaderContext context = refreshReader();
String[] formats = new String[] { "fst", "paged_bytes"}; String[] formats = new String[] { "fst", "paged_bytes"};
for (String format : formats) { for (String format : formats) {

View File

@ -28,6 +28,7 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.fielddata.plain.PackedArrayAtomicFieldData; import org.elasticsearch.index.fielddata.plain.PackedArrayAtomicFieldData;
import org.elasticsearch.index.merge.Merges;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import org.junit.Test; import org.junit.Test;
@ -325,7 +326,7 @@ public class LongFieldDataTests extends AbstractNumericFieldDataTests {
} }
writer.addDocument(doc); writer.addDocument(doc);
} }
writer.forceMerge(1); Merges.forceMerge(writer, 1);
final IndexNumericFieldData indexFieldData = getForField("value"); final IndexNumericFieldData indexFieldData = getForField("value");
final AtomicNumericFieldData atomicFieldData = indexFieldData.load(refreshReader()); final AtomicNumericFieldData atomicFieldData = indexFieldData.load(refreshReader());

View File

@ -41,6 +41,7 @@ import org.elasticsearch.index.codec.postingsformat.PreBuiltPostingsFormatProvid
import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.index.mapper.core.AbstractFieldMapper; import org.elasticsearch.index.mapper.core.AbstractFieldMapper;
import org.elasticsearch.index.mapper.core.CompletionFieldMapper; import org.elasticsearch.index.mapper.core.CompletionFieldMapper;
import org.elasticsearch.index.merge.Merges;
import org.elasticsearch.search.suggest.SuggestUtils; import org.elasticsearch.search.suggest.SuggestUtils;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat.LookupFactory; import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat.LookupFactory;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
@ -265,7 +266,7 @@ public class CompletionPostingsFormatTest extends ElasticsearchTestCase {
writer.addDocument(doc); writer.addDocument(doc);
} }
writer.commit(); writer.commit();
writer.forceMerge(1); Merges.forceMerge(writer, 1);
writer.commit(); writer.commit();
DirectoryReader reader = DirectoryReader.open(writer, true); DirectoryReader reader = DirectoryReader.open(writer, true);
assertThat(reader.leaves().size(), equalTo(1)); assertThat(reader.leaves().size(), equalTo(1));

View File

@ -33,7 +33,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse;
@ -61,6 +60,10 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.FieldMapper.Loading; import org.elasticsearch.index.mapper.FieldMapper.Loading;
import org.elasticsearch.index.merge.policy.*; import org.elasticsearch.index.merge.policy.*;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider;
import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndexTemplateMissingException; import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryMissingException;
@ -295,7 +298,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
client().admin().indices().preparePutTemplate("random_index_template") client().admin().indices().preparePutTemplate("random_index_template")
.setTemplate("*") .setTemplate("*")
.setOrder(0) .setOrder(0)
.setSettings(setRandomNormsLoading(setRandomMergePolicy(getRandom(), ImmutableSettings.builder()) .setSettings(setRandomNormsLoading(setRandomMerge(getRandom(), ImmutableSettings.builder())
.put(INDEX_SEED_SETTING, randomLong()))) .put(INDEX_SEED_SETTING, randomLong())))
.execute().actionGet(); .execute().actionGet();
} }
@ -308,24 +311,38 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return builder; return builder;
} }
private static ImmutableSettings.Builder setRandomMergePolicy(Random random, ImmutableSettings.Builder builder) { private static ImmutableSettings.Builder setRandomMerge(Random random, ImmutableSettings.Builder builder) {
if (random.nextBoolean()) { if (random.nextBoolean()) {
builder.put(AbstractMergePolicyProvider.INDEX_COMPOUND_FORMAT, builder.put(AbstractMergePolicyProvider.INDEX_COMPOUND_FORMAT,
random.nextBoolean() ? random.nextDouble() : random.nextBoolean()); random.nextBoolean() ? random.nextDouble() : random.nextBoolean());
} }
Class<? extends MergePolicyProvider<?>> clazz = TieredMergePolicyProvider.class; Class<? extends MergePolicyProvider<?>> mergePolicy = TieredMergePolicyProvider.class;
switch (random.nextInt(5)) { switch (random.nextInt(5)) {
case 4: case 4:
clazz = LogByteSizeMergePolicyProvider.class; mergePolicy = LogByteSizeMergePolicyProvider.class;
break; break;
case 3: case 3:
clazz = LogDocMergePolicyProvider.class; mergePolicy = LogDocMergePolicyProvider.class;
break; break;
case 0: case 0:
return builder; // don't set the setting at all mergePolicy = null;
} }
assert clazz != null; if (mergePolicy != null) {
builder.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, clazz.getName()); builder.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, mergePolicy.getName());
}
if (random.nextBoolean()) {
builder.put(MergeSchedulerProvider.FORCE_ASYNC_MERGE, random.nextBoolean());
}
switch (random.nextInt(5)) {
case 4:
builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, SerialMergeSchedulerProvider.class.getName());
break;
case 3:
builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, ConcurrentMergeSchedulerProvider.class.getName());
break;
}
return builder; return builder;
} }