Move to use serial merge schedule by default

Today, we use ConcurrentMergeScheduler, and this can be painful since it is concurrent on a shard level, with a max of 3 threads doing concurrent merges. If there are several shards being indexed, then there will be a minor explosion of threads trying to do merges, all being throttled by our merge throttling.
Moving to serial merge scheduler will still maintain concurrency of merges across shards, as we have the merge thread pool that schedules those merges. It will just be a serial one on a specific shard.
Also, on serial merge scheduler, we now have a limit of how many merges it will do at one go, so it will let other shards get their fair chance of merging. We use the pending merges on IW to check if merges are needed or not for it.
Note, that if a merge is happening, it will not block due to a sync on the maybeMerge call at indexing (flush) time, since we wrap our merge scheduler with the EnabledMergeScheduler, where maybeMerge is not activated during indexing, only with explicit calls to IW#maybeMerge (see Merges).
closes #5447
This commit is contained in:
Shay Banon 2014-03-17 18:22:51 +01:00
parent 917c93d7ee
commit 0ef3b03be1
6 changed files with 97 additions and 7 deletions

View File

@ -187,7 +187,11 @@ Defaults to unbounded.
The merge schedule controls the execution of merge operations once they The merge schedule controls the execution of merge operations once they
are needed (according to the merge policy). The following types are are needed (according to the merge policy). The following types are
supported, with the default being the `ConcurrentMergeScheduler`. supported, with the default being the `SerialMergeScheduler`.
Note, the default is the serial merge scheduler since there is a merge
thread pool that explicitly schedules merges, and it makes sure that
merges are serial within a shard, yet concurrent across multiple shards.
[float] [float]
==== ConcurrentMergeScheduler ==== ConcurrentMergeScheduler

View File

@ -46,8 +46,11 @@ public class TrackingSerialMergeScheduler extends MergeScheduler {
private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet(); private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges); private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
public TrackingSerialMergeScheduler(ESLogger logger) { private final int maxMergeAtOnce;
public TrackingSerialMergeScheduler(ESLogger logger, int maxMergeAtOnce) {
this.logger = logger; this.logger = logger;
this.maxMergeAtOnce = maxMergeAtOnce;
} }
public long totalMerges() { public long totalMerges() {
@ -89,7 +92,8 @@ public class TrackingSerialMergeScheduler extends MergeScheduler {
*/ */
@Override @Override
synchronized public void merge(IndexWriter writer) throws CorruptIndexException, IOException { synchronized public void merge(IndexWriter writer) throws CorruptIndexException, IOException {
while (true) { int cycle = 0;
while (cycle++ < maxMergeAtOnce) {
MergePolicy.OneMerge merge = writer.getNextMerge(); MergePolicy.OneMerge merge = writer.getNextMerge();
if (merge == null) if (merge == null)
break; break;

View File

@ -674,7 +674,13 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
@Override @Override
public boolean possibleMergeNeeded() { public boolean possibleMergeNeeded() {
return this.possibleMergeNeeded; IndexWriter writer = this.indexWriter;
if (writer == null) {
return false;
}
// a merge scheduler might bail without going through all its pending merges
// so make sure we also check if there are pending merges
return this.possibleMergeNeeded || writer.hasPendingMerges();
} }
@Override @Override

View File

@ -28,7 +28,7 @@ import org.elasticsearch.common.settings.Settings;
public class MergeSchedulerModule extends AbstractModule { public class MergeSchedulerModule extends AbstractModule {
public static final String MERGE_SCHEDULER_TYPE_KEY = "index.merge.scheduler.type"; public static final String MERGE_SCHEDULER_TYPE_KEY = "index.merge.scheduler.type";
public static final Class<? extends MergeSchedulerProvider> DEFAULT = ConcurrentMergeSchedulerProvider.class; public static final Class<? extends MergeSchedulerProvider> DEFAULT = SerialMergeSchedulerProvider.class;
private final Settings settings; private final Settings settings;

View File

@ -40,11 +40,13 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class SerialMergeSchedulerProvider extends MergeSchedulerProvider { public class SerialMergeSchedulerProvider extends MergeSchedulerProvider {
private Set<CustomSerialMergeScheduler> schedulers = new CopyOnWriteArraySet<CustomSerialMergeScheduler>(); private Set<CustomSerialMergeScheduler> schedulers = new CopyOnWriteArraySet<CustomSerialMergeScheduler>();
private final int maxMergeAtOnce;
@Inject @Inject
public SerialMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) { public SerialMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
super(shardId, indexSettings, threadPool); super(shardId, indexSettings, threadPool);
logger.trace("using [serial] merge scheduler"); this.maxMergeAtOnce = componentSettings.getAsInt("max_merge_at_once", 5);
logger.trace("using [serial] merge scheduler, max_merge_at_once [{}]", maxMergeAtOnce);
} }
@Override @Override
@ -77,7 +79,7 @@ public class SerialMergeSchedulerProvider extends MergeSchedulerProvider {
private final SerialMergeSchedulerProvider provider; private final SerialMergeSchedulerProvider provider;
public CustomSerialMergeScheduler(ESLogger logger, SerialMergeSchedulerProvider provider) { public CustomSerialMergeScheduler(ESLogger logger, SerialMergeSchedulerProvider provider) {
super(logger); super(logger, provider.maxMergeAtOnce);
this.provider = provider; this.provider = provider;
} }

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.index.TrackingSerialMergeScheduler;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.merge.EnableMergeScheduler;
import org.elasticsearch.index.merge.Merges;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.junit.Test;
/**
*/
public class TrackingSerialMergeSchedulerTests extends ElasticsearchLuceneTestCase {
@Test
public void testMaxMergeAtOnce() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
// create a tracking merge scheduler, but enabled one, so we can control when it merges
EnableMergeScheduler mergeScheduler = new EnableMergeScheduler(new TrackingSerialMergeScheduler(Loggers.getLogger(getTestClass()), 2));
iwc.setMergeScheduler(mergeScheduler);
TieredMergePolicy mergePolicy = new TieredMergePolicy();
mergePolicy.setMaxMergeAtOnceExplicit(3);
mergePolicy.setMaxMergeAtOnce(3);
iwc.setMergePolicy(mergePolicy);
IndexWriter iw = new IndexWriter(dir, iwc);
// create 20 segments
for (int i = 0; i < 20; i++) {
Document doc = new Document();
doc.add(new StringField("id", Integer.toString(i), Field.Store.NO));
iw.addDocument(doc);
iw.commit(); // create a segment, no merge will happen, its disabled
}
// based on the merge policy maxMerge, and the fact that we allow only for 2 merges to run
// per maybeMerge in our configuration of the serial merge scheduler, the we expect to need
// 4 merge runs to work out through the pending merges
for (int i = 0; i < 4; i++) {
assertTrue(iw.hasPendingMerges());
Merges.maybeMerge(iw);
assertTrue(iw.hasPendingMerges());
}
Merges.maybeMerge(iw);
assertFalse(iw.hasPendingMerges());
iw.close(false);
dir.close();
}
}