From 254ebc2f88ea05fcd5dce4abf69c476a6aa27d8d Mon Sep 17 00:00:00 2001 From: mikemccand Date: Mon, 12 May 2014 14:06:20 -0400 Subject: [PATCH] #6120 Remove SerialMergeScheduler (master only) It's dangerous to expose SerialMergeScheduler as an option: since it only allows one merge at a time, it can easily cause merging to fall behind. Closes #6120 --- docs/reference/index-modules/merge.asciidoc | 37 +--- .../index/TrackingSerialMergeScheduler.java | 168 ------------------ .../SerialMergeSchedulerProvider.java | 120 ------------- .../TrackingSerialMergeSchedulerTests.java | 73 -------- .../engine/internal/InternalEngineTests.java | 7 +- .../test/ElasticsearchIntegrationTest.java | 6 +- 6 files changed, 9 insertions(+), 402 deletions(-) delete mode 100644 src/main/java/org/apache/lucene/index/TrackingSerialMergeScheduler.java delete mode 100644 src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java delete mode 100644 src/test/java/org/apache/lucene/TrackingSerialMergeSchedulerTests.java diff --git a/docs/reference/index-modules/merge.asciidoc b/docs/reference/index-modules/merge.asciidoc index 98b42c52fa3..65b4825de85 100644 --- a/docs/reference/index-modules/merge.asciidoc +++ b/docs/reference/index-modules/merge.asciidoc @@ -185,19 +185,11 @@ Defaults to unbounded. [[scheduling]] === Scheduling -The merge schedule controls the execution of merge operations once they -are needed (according to the merge policy). The following types are -supported, with the default being the `ConcurrentMergeScheduler`. - -[float] -==== ConcurrentMergeScheduler - -A merge scheduler that runs merges using a separate thread. When the maximum -number of threads is reached, further merges will wait until a merge thread -becomes available. - - -The scheduler supports the following settings: +The merge scheduler (ConcurrentMergeScheduler) controls the execution of +merge operations once they are needed (according to the merge policy). Merges +run in separate threads, and when the maximum number of threads is reached, +further merges will wait until a merge thread becomes available. The merge +scheduler supports this setting: `index.merge.scheduler.max_thread_count`:: @@ -209,22 +201,3 @@ coming[1.2.0] The default will change to `1` which works best with spinning-magnets disks. If you are using a good solid-state disk (SSD) instead then try setting this to `3`. - -[float] -==== SerialMergeScheduler - -A merge scheduler that simply does each merge sequentially using the -calling thread (blocking the operations that triggered the merge or the -index operation). This merge scheduler has a merge thread pool that -explicitly schedules merges, and it makes sure that merges are serial -within a shard, yet concurrent across multiple shards. - -The scheduler supports the following settings: - -`index.merge.scheduler.max_merge_at_once`:: - -The maximum number of merges a single merge run performs. This setting prevents -executing unlimited amount of merges in a loop until another shards has a -chance to get a merge thread from the pool. If this limit is reached the -merge thread returns to the pool and continues once the the call to a single -shards is executed. The default is `5` diff --git a/src/main/java/org/apache/lucene/index/TrackingSerialMergeScheduler.java b/src/main/java/org/apache/lucene/index/TrackingSerialMergeScheduler.java deleted file mode 100644 index 46c5018e1a5..00000000000 --- a/src/main/java/org/apache/lucene/index/TrackingSerialMergeScheduler.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.lucene.index; - -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.metrics.MeanMetric; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.index.merge.OnGoingMerge; - -import java.io.IOException; -import java.util.Collections; -import java.util.Set; - -// LUCENE MONITOR - Copied from SerialMergeScheduler -public class TrackingSerialMergeScheduler extends MergeScheduler { - - protected final ESLogger logger; - - private final MeanMetric totalMerges = new MeanMetric(); - private final CounterMetric totalMergesNumDocs = new CounterMetric(); - private final CounterMetric totalMergesSizeInBytes = new CounterMetric(); - private final CounterMetric currentMerges = new CounterMetric(); - private final CounterMetric currentMergesNumDocs = new CounterMetric(); - private final CounterMetric currentMergesSizeInBytes = new CounterMetric(); - - private final Set onGoingMerges = ConcurrentCollections.newConcurrentSet(); - private final Set readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges); - - private final int maxMergeAtOnce; - - public TrackingSerialMergeScheduler(ESLogger logger, int maxMergeAtOnce) { - this.logger = logger; - this.maxMergeAtOnce = maxMergeAtOnce; - } - - public long totalMerges() { - return totalMerges.count(); - } - - public long totalMergeTime() { - return totalMerges.sum(); - } - - public long totalMergeNumDocs() { - return totalMergesNumDocs.count(); - } - - public long totalMergeSizeInBytes() { - return totalMergesSizeInBytes.count(); - } - - public long currentMerges() { - return currentMerges.count(); - } - - public long currentMergesNumDocs() { - return currentMergesNumDocs.count(); - } - - public long currentMergesSizeInBytes() { - return currentMergesSizeInBytes.count(); - } - - public Set onGoingMerges() { - return readOnlyOnGoingMerges; - } - - /** - * Just do the merges in sequence. We do this - * "synchronized" so that even if the application is using - * multiple threads, only one merge may run at a time. - */ - @Override - synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws CorruptIndexException, IOException { - int cycle = 0; - while (cycle++ < maxMergeAtOnce) { - MergePolicy.OneMerge merge = writer.getNextMerge(); - if (merge == null) - break; - - // different from serial merge, call mergeInit here so we get the correct stats - // mergeInit can be called several times without side affects (checks on merge.info not being null) - writer.mergeInit(merge); - - int totalNumDocs = merge.totalNumDocs(); - long totalSizeInBytes = merge.totalBytesSize(); - long time = System.currentTimeMillis(); - currentMerges.inc(); - currentMergesNumDocs.inc(totalNumDocs); - currentMergesSizeInBytes.inc(totalSizeInBytes); - - OnGoingMerge onGoingMerge = new OnGoingMerge(merge); - onGoingMerges.add(onGoingMerge); - - // sadly, segment name is not available since mergeInit is called from merge itself... - if (logger.isTraceEnabled()) { - logger.trace("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size", merge.info == null ? "_na_" : merge.info.info.name, merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes), new ByteSizeValue(merge.estimatedMergeBytes)); - } - try { - beforeMerge(onGoingMerge); - writer.merge(merge); - } finally { - long took = System.currentTimeMillis() - time; - - onGoingMerges.remove(onGoingMerge); - afterMerge(onGoingMerge); - - currentMerges.dec(); - currentMergesNumDocs.dec(totalNumDocs); - currentMergesSizeInBytes.dec(totalSizeInBytes); - - totalMergesNumDocs.inc(totalNumDocs); - totalMergesSizeInBytes.inc(totalSizeInBytes); - totalMerges.inc(took); - if (took > 20000) { // if more than 20 seconds, DEBUG log it - logger.debug("merge [{}] done, took [{}]", merge.info == null ? "_na_" : merge.info.info.name, TimeValue.timeValueMillis(took)); - } else if (logger.isTraceEnabled()) { - logger.trace("merge [{}] done, took [{}]", merge.info == null ? "_na_" : merge.info.info.name, TimeValue.timeValueMillis(took)); - } - } - } - } - - /** - * A callback allowing for custom logic before an actual merge starts. - */ - protected void beforeMerge(OnGoingMerge merge) { - - } - - /** - * A callback allowing for custom logic before an actual merge starts. - */ - protected void afterMerge(OnGoingMerge merge) { - - } - - @Override - public void close() { - } - - @Override - public MergeScheduler clone() { - // Lucene IW makes a clone internally but since we hold on to this instance - // the clone will just be the identity. - return this; - } -} diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java deleted file mode 100644 index 553bbf92e7a..00000000000 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.merge.scheduler; - -import com.google.common.collect.ImmutableSet; -import org.apache.lucene.index.*; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.merge.MergeStats; -import org.elasticsearch.index.merge.OnGoingMerge; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; - -/** - * - */ -public class SerialMergeSchedulerProvider extends MergeSchedulerProvider { - - private Set schedulers = new CopyOnWriteArraySet<>(); - private final int maxMergeAtOnce; - - @Inject - public SerialMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) { - super(shardId, indexSettings, threadPool); - this.maxMergeAtOnce = componentSettings.getAsInt("max_merge_at_once", 5); - logger.trace("using [serial] merge scheduler, max_merge_at_once [{}]", maxMergeAtOnce); - } - - @Override - public MergeScheduler buildMergeScheduler() { - CustomSerialMergeScheduler scheduler = new CustomSerialMergeScheduler(logger, this); - schedulers.add(scheduler); - return scheduler; - } - - @Override - public MergeStats stats() { - MergeStats mergeStats = new MergeStats(); - for (CustomSerialMergeScheduler scheduler : schedulers) { - mergeStats.add(scheduler.totalMerges(), scheduler.totalMergeTime(), scheduler.totalMergeNumDocs(), scheduler.totalMergeSizeInBytes(), - scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes()); - } - return mergeStats; - } - - @Override - public Set onGoingMerges() { - for (CustomSerialMergeScheduler scheduler : schedulers) { - return scheduler.onGoingMerges(); - } - return ImmutableSet.of(); - } - - @Override - public void close() { - - } - - public static class CustomSerialMergeScheduler extends TrackingSerialMergeScheduler { - - private final SerialMergeSchedulerProvider provider; - - public CustomSerialMergeScheduler(ESLogger logger, SerialMergeSchedulerProvider provider) { - super(logger, provider.maxMergeAtOnce); - this.provider = provider; - } - - @Override - public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws CorruptIndexException, IOException { - try { - super.merge(writer, trigger, newMergesFound); - } catch (Throwable e) { - logger.warn("failed to merge", e); - provider.failedMerge(new MergePolicy.MergeException(e, writer.getDirectory())); - throw new MergePolicy.MergeException(e, writer.getDirectory()); - } - } - - @Override - public void close() { - super.close(); - provider.schedulers.remove(this); - } - - @Override - protected void beforeMerge(OnGoingMerge merge) { - super.beforeMerge(merge); - provider.beforeMerge(merge); - } - - @Override - protected void afterMerge(OnGoingMerge merge) { - super.afterMerge(merge); - provider.afterMerge(merge); - } - } -} diff --git a/src/test/java/org/apache/lucene/TrackingSerialMergeSchedulerTests.java b/src/test/java/org/apache/lucene/TrackingSerialMergeSchedulerTests.java deleted file mode 100644 index 1f596bf9b4e..00000000000 --- a/src/test/java/org/apache/lucene/TrackingSerialMergeSchedulerTests.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.lucene; - -import org.apache.lucene.analysis.MockAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.StringField; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.TieredMergePolicy; -import org.apache.lucene.index.TrackingSerialMergeScheduler; -import org.apache.lucene.store.Directory; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.index.merge.EnableMergeScheduler; -import org.elasticsearch.test.ElasticsearchLuceneTestCase; -import org.junit.Test; - -/** - */ -public class TrackingSerialMergeSchedulerTests extends ElasticsearchLuceneTestCase { - - @Test - public void testMaxMergeAtOnce() throws Exception { - Directory dir = newDirectory(); - IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); - // create a tracking merge scheduler, but enabled one, so we can control when it merges - EnableMergeScheduler mergeScheduler = new EnableMergeScheduler(new TrackingSerialMergeScheduler(Loggers.getLogger(getTestClass()), 2)); - iwc.setMergeScheduler(mergeScheduler); - TieredMergePolicy mergePolicy = new TieredMergePolicy(); - mergePolicy.setMaxMergeAtOnceExplicit(3); - mergePolicy.setMaxMergeAtOnce(3); - iwc.setMergePolicy(mergePolicy); - IndexWriter iw = new IndexWriter(dir, iwc); - // create 20 segments - for (int i = 0; i < 20; i++) { - Document doc = new Document(); - doc.add(new StringField("id", Integer.toString(i), Field.Store.NO)); - iw.addDocument(doc); - iw.commit(); // create a segment, no merge will happen, its disabled - } - // based on the merge policy maxMerge, and the fact that we allow only for 2 merges to run - // per maybeMerge in our configuration of the serial merge scheduler, the we expect to need - // 4 merge runs to work out through the pending merges - for (int i = 0; i < 4; i++) { - assertTrue(iw.hasPendingMerges()); - iw.maybeMerge(); - assertTrue(iw.hasPendingMerges()); - } - iw.maybeMerge(); - assertFalse(iw.hasPendingMerges()); - - iw.close(false); - dir.close(); - } -} diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index 1aff856da2f..06523eb2980 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -57,7 +57,6 @@ import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; -import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.similarity.SimilarityService; @@ -198,12 +197,12 @@ public class InternalEngineTests extends ElasticsearchTestCase { return new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(new Index("test"), EMPTY_SETTINGS)); } - protected MergeSchedulerProvider createMergeScheduler() { - return new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool); + protected MergeSchedulerProvider createMergeScheduler(IndexSettingsService indexSettingsService) { + return new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService); } protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog) { - return createEngine(indexSettingsService, store, translog, createMergeScheduler()); + return createEngine(indexSettingsService, store, translog, createMergeScheduler(indexSettingsService)); } protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) { diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 96e8202cd68..95bca43a564 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -76,7 +76,6 @@ import org.elasticsearch.index.merge.policy.*; import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; -import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider; import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; @@ -433,10 +432,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase if (random.nextBoolean()) { builder.put(MergeSchedulerProvider.FORCE_ASYNC_MERGE, random.nextBoolean()); } - switch (random.nextInt(5)) { - case 4: - builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, SerialMergeSchedulerProvider.class.getName()); - break; + switch (random.nextInt(4)) { case 3: builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, ConcurrentMergeSchedulerProvider.class.getName()); break;