From 992747a159ced8ec9c3eb4d2b99b852ce8f626c1 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Wed, 5 Mar 2014 12:26:26 +0000 Subject: [PATCH] Force merges to not happen when indexing a doc / flush Today, even though our merge policy doesn't return new merge specs on SEGMENT_FLUSH, merge on the scheduler is still called on flush time, and can cause merges to stall indexing during merges. Both for the concurrent merge scheduler (the default) and the serial merge scheduler. This behavior become worse when throttling kicks in (today at 20mb per sec). In order to solve it (outside of Lucene for now), we wrap the merge scheduler with an EnableMergeScheduler, where, on the thread level, using a thread local, the call to merge can be enabled/disabled. A Merges helper class is added where all explicit merges operations should go through. If the scheduler is the enabled one, it will enable merges before calling the relevant explicit method call. In order to make sure Merges is the only class that calls the explicit merge calls, the IW variant of them is added to the forbidden APIs list. closes #5319 --- core-signatures.txt | 6 + pom.xml | 1 + .../index/engine/internal/InternalEngine.java | 9 +- .../index/merge/EnableMergeScheduler.java | 86 ++++++++++++++ .../org/elasticsearch/index/merge/Merges.java | 107 ++++++++++++++++++ .../ConcurrentMergeSchedulerProvider.java | 2 +- .../merge/scheduler/MergeSchedulerModule.java | 9 +- .../scheduler/MergeSchedulerProvider.java | 16 ++- .../SerialMergeSchedulerProvider.java | 2 +- .../fielddata/LongFieldDataBenchmark.java | 12 +- .../common/lucene/uid/VersionsTests.java | 3 +- .../DefaultPostingsFormatTests.java | 3 +- .../engine/internal/InternalEngineTests.java | 4 +- .../index/fielddata/FilterFieldDataTest.java | 7 +- .../index/fielddata/LongFieldDataTests.java | 3 +- .../CompletionPostingsFormatTest.java | 3 +- .../test/ElasticsearchIntegrationTest.java | 35 ++++-- 17 files changed, 275 insertions(+), 33 deletions(-) create mode 100644 src/main/java/org/elasticsearch/index/merge/EnableMergeScheduler.java create mode 100644 src/main/java/org/elasticsearch/index/merge/Merges.java diff --git a/core-signatures.txt b/core-signatures.txt index b2806ef3e24..0af12bd9693 100644 --- a/core-signatures.txt +++ b/core-signatures.txt @@ -24,3 +24,9 @@ org.apache.lucene.util.RamUsageEstimator#sizeOf(java.lang.Object) @ This can be org.apache.lucene.index.IndexReader#decRef() org.apache.lucene.index.IndexReader#incRef() org.apache.lucene.index.IndexReader#tryIncRef() + +org.apache.lucene.index.IndexWriter#maybeMerge() @ use Merges#maybeMerge +org.apache.lucene.index.IndexWriter#forceMerge(int) @ use Merges#forceMerge +org.apache.lucene.index.IndexWriter#forceMerge(int,boolean) @ use Merges#forceMerge +org.apache.lucene.index.IndexWriter#forceMergeDeletes() @ use Merges#forceMergeDeletes +org.apache.lucene.index.IndexWriter#forceMergeDeletes(boolean) @ use Merges#forceMergeDeletes diff --git a/pom.xml b/pom.xml index 691299abce7..9fb0c24cfac 100644 --- a/pom.xml +++ b/pom.xml @@ -990,6 +990,7 @@ org/apache/lucene/search/XReferenceManager.class org/apache/lucene/search/XSearcherManager.class org/elasticsearch/index/percolator/stats/ShardPercolateService$RamEstimator.class + org/elasticsearch/index/merge/Merges.class org/elasticsearch/common/util/UnsafeUtils.class diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 0a0e903c907..5764467f0ca 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -56,6 +56,7 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.*; import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.merge.Merges; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy; import org.elasticsearch.index.merge.policy.MergePolicyProvider; @@ -947,7 +948,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin rwl.readLock().lock(); try { ensureOpen(); - indexWriter.maybeMerge(); + Merges.maybeMerge(indexWriter); } catch (OutOfMemoryError e) { failEngine(e); throw new OptimizeFailedEngineException(shardId, e); @@ -973,12 +974,12 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin try { ensureOpen(); if (optimize.onlyExpungeDeletes()) { - indexWriter.forceMergeDeletes(false); + Merges.forceMergeDeletes(indexWriter, false); } else if (optimize.maxNumSegments() <= 0) { - indexWriter.maybeMerge(); + Merges.maybeMerge(indexWriter); possibleMergeNeeded = false; } else { - indexWriter.forceMerge(optimize.maxNumSegments(), false); + Merges.forceMerge(indexWriter, optimize.maxNumSegments(), false); } } catch (OutOfMemoryError e) { failEngine(e); diff --git a/src/main/java/org/elasticsearch/index/merge/EnableMergeScheduler.java b/src/main/java/org/elasticsearch/index/merge/EnableMergeScheduler.java new file mode 100644 index 00000000000..090d49925a6 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/merge/EnableMergeScheduler.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.merge; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.MergeScheduler; + +import java.io.IOException; + +/** + * A wrapper of another {@link org.apache.lucene.index.MergeScheduler} that allows + * to explicitly enable merge and disable on a thread local basis. The default is + * to have merges disabled. + *

+ * This merge scheduler can be used to get around the fact that even though a merge + * policy can control that no new merges will be created as a result of a segment flush + * (during indexing operation for example), the {@link #merge(org.apache.lucene.index.IndexWriter)} + * call will still be called, and can result in stalling indexing. + */ +public class EnableMergeScheduler extends MergeScheduler { + + private final MergeScheduler mergeScheduler; + + private final ThreadLocal enabled = new ThreadLocal() { + @Override + protected Boolean initialValue() { + return Boolean.FALSE; + } + }; + + public EnableMergeScheduler(MergeScheduler mergeScheduler) { + this.mergeScheduler = mergeScheduler; + } + + /** + * Enable merges on the current thread. + */ + void enableMerge() { + assert !enabled.get(); + enabled.set(Boolean.TRUE); + } + + /** + * Disable merges on the current thread. + */ + void disableMerge() { + assert enabled.get(); + enabled.set(Boolean.FALSE); + } + + @Override + public void merge(IndexWriter writer) throws IOException { + if (enabled.get()) { + mergeScheduler.merge(writer); + } + } + + @Override + public void close() throws IOException { + mergeScheduler.close(); + } + + @Override + public MergeScheduler clone() { + // Lucene IW makes a clone internally but since we hold on to this instance + // the clone will just be the identity. + return this; + } +} diff --git a/src/main/java/org/elasticsearch/index/merge/Merges.java b/src/main/java/org/elasticsearch/index/merge/Merges.java new file mode 100644 index 00000000000..460ca7869a4 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/merge/Merges.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.merge; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.MergeScheduler; + +import java.io.IOException; + +/** + * A helper to execute explicit merges of the {@link org.apache.lucene.index.IndexWriter} APIs. It + * holds additional logic which in case the merge scheduler is an {@link org.elasticsearch.index.merge.EnableMergeScheduler} + * then merges are explicitly enabled and disabled back at the end. + *

+ * In our codebase, at least until we can somehow use this logic in Lucene IW itself, we should only use + * this class to execute explicit merges. The explicit merge calls have been added to the forbidden APIs + * list to make sure we don't call them unless we use this class. + */ +public class Merges { + + /** + * See {@link org.apache.lucene.index.IndexWriter#maybeMerge()}, with the additional + * logic of explicitly enabling merges if the scheduler is {@link org.elasticsearch.index.merge.EnableMergeScheduler}. + */ + public static void maybeMerge(IndexWriter writer) throws IOException { + MergeScheduler mergeScheduler = writer.getConfig().getMergeScheduler(); + if (mergeScheduler instanceof EnableMergeScheduler) { + ((EnableMergeScheduler) mergeScheduler).enableMerge(); + try { + writer.maybeMerge(); + } finally { + ((EnableMergeScheduler) mergeScheduler).disableMerge(); + } + } else { + writer.maybeMerge(); + } + } + + /** + * See {@link org.apache.lucene.index.IndexWriter#forceMerge(int)}, with the additional + * logic of explicitly enabling merges if the scheduler is {@link org.elasticsearch.index.merge.EnableMergeScheduler}. + */ + public static void forceMerge(IndexWriter writer, int maxNumSegments) throws IOException { + forceMerge(writer, maxNumSegments, true); + } + + /** + * See {@link org.apache.lucene.index.IndexWriter#forceMerge(int, boolean)}, with the additional + * logic of explicitly enabling merges if the scheduler is {@link org.elasticsearch.index.merge.EnableMergeScheduler}. + */ + public static void forceMerge(IndexWriter writer, int maxNumSegments, boolean doWait) throws IOException { + MergeScheduler mergeScheduler = writer.getConfig().getMergeScheduler(); + if (mergeScheduler instanceof EnableMergeScheduler) { + ((EnableMergeScheduler) mergeScheduler).enableMerge(); + try { + writer.forceMerge(maxNumSegments, doWait); + } finally { + ((EnableMergeScheduler) mergeScheduler).disableMerge(); + } + } else { + writer.forceMerge(maxNumSegments, doWait); + } + } + + /** + * See {@link org.apache.lucene.index.IndexWriter#forceMergeDeletes()}, with the additional + * logic of explicitly enabling merges if the scheduler is {@link org.elasticsearch.index.merge.EnableMergeScheduler}. + */ + public static void forceMergeDeletes(IndexWriter writer) throws IOException { + forceMergeDeletes(writer, true); + } + + /** + * See {@link org.apache.lucene.index.IndexWriter#forceMergeDeletes(boolean)}, with the additional + * logic of explicitly enabling merges if the scheduler is {@link org.elasticsearch.index.merge.EnableMergeScheduler}. + */ + public static void forceMergeDeletes(IndexWriter writer, boolean doWait) throws IOException { + MergeScheduler mergeScheduler = writer.getConfig().getMergeScheduler(); + if (mergeScheduler instanceof EnableMergeScheduler) { + ((EnableMergeScheduler) mergeScheduler).enableMerge(); + try { + writer.forceMergeDeletes(doWait); + } finally { + ((EnableMergeScheduler) mergeScheduler).disableMerge(); + } + } else { + writer.forceMergeDeletes(doWait); + } + } +} diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java index da5f502f512..7eb498dbeab 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java @@ -59,7 +59,7 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { } @Override - public MergeScheduler newMergeScheduler() { + public MergeScheduler buildMergeScheduler() { CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this); concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount, maxThreadCount); schedulers.add(concurrentMergeScheduler); diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerModule.java b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerModule.java index a2edc9d0914..01cf42c3b85 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerModule.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerModule.java @@ -22,16 +22,13 @@ package org.elasticsearch.index.merge.scheduler; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; -import static org.elasticsearch.index.merge.scheduler.MergeSchedulerModule.MergeSchedulerSettings.TYPE; - /** * */ public class MergeSchedulerModule extends AbstractModule { - public static class MergeSchedulerSettings { - public static final String TYPE = "index.merge.scheduler.type"; - } + public static final String MERGE_SCHEDULER_TYPE_KEY = "index.merge.scheduler.type"; + public static final Class DEFAULT = ConcurrentMergeSchedulerProvider.class; private final Settings settings; @@ -42,7 +39,7 @@ public class MergeSchedulerModule extends AbstractModule { @Override protected void configure() { bind(MergeSchedulerProvider.class) - .to(settings.getAsClass(TYPE, ConcurrentMergeSchedulerProvider.class, "org.elasticsearch.index.merge.scheduler.", "MergeSchedulerProvider")) + .to(settings.getAsClass(MERGE_SCHEDULER_TYPE_KEY, DEFAULT, "org.elasticsearch.index.merge.scheduler.", "MergeSchedulerProvider")) .asEagerSingleton(); } } diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java index ef37ce1bf1e..2a3c06248aa 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.merge.scheduler; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.MergeScheduler; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.merge.EnableMergeScheduler; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.settings.IndexSettings; @@ -36,7 +37,9 @@ import java.util.concurrent.CopyOnWriteArrayList; /** * */ -public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent { +public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent { + + public static final String FORCE_ASYNC_MERGE = "index.merge.force_async_merge"; public static interface FailureListener { void onFailedMerge(MergePolicy.MergeException e); @@ -110,7 +113,16 @@ public abstract class MergeSchedulerProvider extends A } } - public abstract T newMergeScheduler(); + public final MergeScheduler newMergeScheduler() { + MergeScheduler scheduler = buildMergeScheduler(); + // an internal settings, that would allow us to disable this behavior if really needed + if (indexSettings.getAsBoolean(FORCE_ASYNC_MERGE, true)) { + scheduler = new EnableMergeScheduler(scheduler); + } + return scheduler; + } + + protected abstract MergeScheduler buildMergeScheduler(); public abstract MergeStats stats(); diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java index 84520f017e3..f78a413075e 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java @@ -48,7 +48,7 @@ public class SerialMergeSchedulerProvider extends MergeSchedulerProvider { } @Override - public MergeScheduler newMergeScheduler() { + public MergeScheduler buildMergeScheduler() { CustomSerialMergeScheduler scheduler = new CustomSerialMergeScheduler(logger, this); schedulers.add(scheduler); return scheduler; diff --git a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java index 4937f1b6c8b..72a7f7c1017 100644 --- a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java +++ b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.fielddata.IndexNumericFieldData; import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.core.LongFieldMapper; +import org.elasticsearch.index.merge.Merges; import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import java.util.Random; @@ -51,6 +52,7 @@ public class LongFieldDataBenchmark { public int numValues() { return 1; } + @Override public long nextValue() { return RANDOM.nextInt(16); @@ -60,6 +62,7 @@ public class LongFieldDataBenchmark { public int numValues() { return 1; } + @Override public long nextValue() { // somewhere in-between 2010 and 2012 @@ -70,6 +73,7 @@ public class LongFieldDataBenchmark { public int numValues() { return RANDOM.nextInt(3); } + @Override public long nextValue() { // somewhere in-between 2010 and 2012 @@ -80,6 +84,7 @@ public class LongFieldDataBenchmark { public int numValues() { return RANDOM.nextInt(3); } + @Override public long nextValue() { return 3 + RANDOM.nextInt(8); @@ -89,6 +94,7 @@ public class LongFieldDataBenchmark { public int numValues() { return RANDOM.nextFloat() < 0.1f ? 1 : 0; } + @Override public long nextValue() { return RANDOM.nextLong(); @@ -98,6 +104,7 @@ public class LongFieldDataBenchmark { public int numValues() { return RANDOM.nextFloat() < 0.1f ? 1 + RANDOM.nextInt(5) : 0; } + @Override public long nextValue() { return RANDOM.nextLong(); @@ -107,12 +114,15 @@ public class LongFieldDataBenchmark { public int numValues() { return 1 + RANDOM.nextInt(3); } + @Override public long nextValue() { return RANDOM.nextLong(); } }; + public abstract int numValues(); + public abstract long nextValue(); } @@ -132,7 +142,7 @@ public class LongFieldDataBenchmark { } indexWriter.addDocument(doc); } - indexWriter.forceMerge(1); + Merges.forceMerge(indexWriter, 1); indexWriter.close(); final DirectoryReader dr = DirectoryReader.open(dir); diff --git a/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java index 50968bca768..855605866a4 100644 --- a/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java +++ b/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.Numbers; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.VersionFieldMapper; +import org.elasticsearch.index.merge.Merges; import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy; import org.elasticsearch.test.ElasticsearchLuceneTestCase; import org.hamcrest.MatcherAssert; @@ -266,7 +267,7 @@ public class VersionsTests extends ElasticsearchLuceneTestCase { .put("1", 0L).put("2", 0L).put("3", 0L).put("4", 4L).put("5", 5L).put("6", 6L).build(); // Force merge and check versions - iw.forceMerge(1); + Merges.forceMerge(iw, 1); final AtomicReader ir = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(iw.getDirectory())); final NumericDocValues versions = ir.getNumericDocValues(VersionFieldMapper.NAME); assertThat(versions, notNullValue()); diff --git a/src/test/java/org/elasticsearch/index/codec/postingformat/DefaultPostingsFormatTests.java b/src/test/java/org/elasticsearch/index/codec/postingformat/DefaultPostingsFormatTests.java index 40030193f52..a8e3a144e27 100644 --- a/src/test/java/org/elasticsearch/index/codec/postingformat/DefaultPostingsFormatTests.java +++ b/src/test/java/org/elasticsearch/index/codec/postingformat/DefaultPostingsFormatTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat; import org.elasticsearch.index.codec.postingsformat.Elasticsearch090PostingsFormat; import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.index.merge.Merges; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; @@ -93,7 +94,7 @@ public class DefaultPostingsFormatTests extends ElasticsearchTestCase { for (int i = 0; i < 100; i++) { writer.addDocument(Arrays.asList(new TextField("foo", "foo bar foo bar", Store.YES), new TextField("some_other_field", "1234", Store.YES))); } - writer.forceMerge(1); + Merges.forceMerge(writer, 1); writer.commit(); DirectoryReader reader = DirectoryReader.open(writer, false); diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index b81ea24efa8..0d990937fba 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -185,7 +185,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { return new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(new Index("test"), EMPTY_SETTINGS)); } - protected MergeSchedulerProvider createMergeScheduler() { + protected MergeSchedulerProvider createMergeScheduler() { return new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool); } @@ -193,7 +193,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { return createEngine(indexSettingsService, store, translog, createMergeScheduler()); } - protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) { + protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) { return new InternalEngine(shardId, defaultSettings, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider, new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new CodecService(shardId.index())); } diff --git a/src/test/java/org/elasticsearch/index/fielddata/FilterFieldDataTest.java b/src/test/java/org/elasticsearch/index/fielddata/FilterFieldDataTest.java index bb11b99db46..91dc5efed1e 100644 --- a/src/test/java/org/elasticsearch/index/fielddata/FilterFieldDataTest.java +++ b/src/test/java/org/elasticsearch/index/fielddata/FilterFieldDataTest.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.index.fielddata.AtomicFieldData.WithOrdinals; import org.elasticsearch.index.fielddata.ScriptDocValues.Strings; import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs; +import org.elasticsearch.index.merge.Merges; import org.junit.Test; import java.util.Random; @@ -60,7 +61,7 @@ public class FilterFieldDataTest extends AbstractFieldDataTests { } writer.addDocument(d); } - writer.forceMerge(1); + Merges.forceMerge(writer, 1); AtomicReaderContext context = refreshReader(); String[] formats = new String[] { "fst", "paged_bytes"}; @@ -162,8 +163,8 @@ public class FilterFieldDataTest extends AbstractFieldDataTests { } writer.addDocument(d); } - System.out.println(hundred + " " + ten + " " +five); - writer.forceMerge(1); + System.out.println(hundred + " " + ten + " " + five); + Merges.forceMerge(writer, 1); AtomicReaderContext context = refreshReader(); String[] formats = new String[] { "fst", "paged_bytes"}; for (String format : formats) { diff --git a/src/test/java/org/elasticsearch/index/fielddata/LongFieldDataTests.java b/src/test/java/org/elasticsearch/index/fielddata/LongFieldDataTests.java index b8a4e307097..288cfea5307 100644 --- a/src/test/java/org/elasticsearch/index/fielddata/LongFieldDataTests.java +++ b/src/test/java/org/elasticsearch/index/fielddata/LongFieldDataTests.java @@ -28,6 +28,7 @@ import org.apache.lucene.document.StringField; import org.apache.lucene.index.Term; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.index.fielddata.plain.PackedArrayAtomicFieldData; +import org.elasticsearch.index.merge.Merges; import org.joda.time.DateTimeZone; import org.junit.Test; @@ -325,7 +326,7 @@ public class LongFieldDataTests extends AbstractNumericFieldDataTests { } writer.addDocument(doc); } - writer.forceMerge(1); + Merges.forceMerge(writer, 1); final IndexNumericFieldData indexFieldData = getForField("value"); final AtomicNumericFieldData atomicFieldData = indexFieldData.load(refreshReader()); diff --git a/src/test/java/org/elasticsearch/search/suggest/completion/CompletionPostingsFormatTest.java b/src/test/java/org/elasticsearch/search/suggest/completion/CompletionPostingsFormatTest.java index eae32313baf..650ac2ab352 100644 --- a/src/test/java/org/elasticsearch/search/suggest/completion/CompletionPostingsFormatTest.java +++ b/src/test/java/org/elasticsearch/search/suggest/completion/CompletionPostingsFormatTest.java @@ -41,6 +41,7 @@ import org.elasticsearch.index.codec.postingsformat.PreBuiltPostingsFormatProvid import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.mapper.core.AbstractFieldMapper; import org.elasticsearch.index.mapper.core.CompletionFieldMapper; +import org.elasticsearch.index.merge.Merges; import org.elasticsearch.search.suggest.SuggestUtils; import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat.LookupFactory; import org.elasticsearch.test.ElasticsearchTestCase; @@ -265,7 +266,7 @@ public class CompletionPostingsFormatTest extends ElasticsearchTestCase { writer.addDocument(doc); } writer.commit(); - writer.forceMerge(1); + Merges.forceMerge(writer, 1); writer.commit(); DirectoryReader reader = DirectoryReader.open(writer, true); assertThat(reader.leaves().size(), equalTo(1)); diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 0c896b02e0e..be482cff9ff 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -33,7 +33,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse; @@ -61,6 +60,10 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.mapper.FieldMapper.Loading; import org.elasticsearch.index.merge.policy.*; +import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; +import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule; +import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; +import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexTemplateMissingException; import org.elasticsearch.repositories.RepositoryMissingException; @@ -295,7 +298,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase client().admin().indices().preparePutTemplate("random_index_template") .setTemplate("*") .setOrder(0) - .setSettings(setRandomNormsLoading(setRandomMergePolicy(getRandom(), ImmutableSettings.builder()) + .setSettings(setRandomNormsLoading(setRandomMerge(getRandom(), ImmutableSettings.builder()) .put(INDEX_SEED_SETTING, randomLong()))) .execute().actionGet(); } @@ -308,24 +311,38 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase return builder; } - private static ImmutableSettings.Builder setRandomMergePolicy(Random random, ImmutableSettings.Builder builder) { + private static ImmutableSettings.Builder setRandomMerge(Random random, ImmutableSettings.Builder builder) { if (random.nextBoolean()) { builder.put(AbstractMergePolicyProvider.INDEX_COMPOUND_FORMAT, random.nextBoolean() ? random.nextDouble() : random.nextBoolean()); } - Class> clazz = TieredMergePolicyProvider.class; + Class> mergePolicy = TieredMergePolicyProvider.class; switch (random.nextInt(5)) { case 4: - clazz = LogByteSizeMergePolicyProvider.class; + mergePolicy = LogByteSizeMergePolicyProvider.class; break; case 3: - clazz = LogDocMergePolicyProvider.class; + mergePolicy = LogDocMergePolicyProvider.class; break; case 0: - return builder; // don't set the setting at all + mergePolicy = null; } - assert clazz != null; - builder.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, clazz.getName()); + if (mergePolicy != null) { + builder.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, mergePolicy.getName()); + } + + if (random.nextBoolean()) { + builder.put(MergeSchedulerProvider.FORCE_ASYNC_MERGE, random.nextBoolean()); + } + switch (random.nextInt(5)) { + case 4: + builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, SerialMergeSchedulerProvider.class.getName()); + break; + case 3: + builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, ConcurrentMergeSchedulerProvider.class.getName()); + break; + } + return builder; }