#6120 Remove SerialMergeScheduler (master only)

It's dangerous to expose SerialMergeScheduler as an option: since it only allows one merge at a time, it can easily cause merging to fall behind.

Closes #6120
This commit is contained in:
mikemccand 2014-05-12 14:06:20 -04:00
parent eae304aa39
commit 254ebc2f88
6 changed files with 9 additions and 402 deletions

View File

@ -185,19 +185,11 @@ Defaults to unbounded.
[[scheduling]] [[scheduling]]
=== Scheduling === Scheduling
The merge schedule controls the execution of merge operations once they The merge scheduler (ConcurrentMergeScheduler) controls the execution of
are needed (according to the merge policy). The following types are merge operations once they are needed (according to the merge policy). Merges
supported, with the default being the `ConcurrentMergeScheduler`. run in separate threads, and when the maximum number of threads is reached,
further merges will wait until a merge thread becomes available. The merge
[float] scheduler supports this setting:
==== ConcurrentMergeScheduler
A merge scheduler that runs merges using a separate thread. When the maximum
number of threads is reached, further merges will wait until a merge thread
becomes available.
The scheduler supports the following settings:
`index.merge.scheduler.max_thread_count`:: `index.merge.scheduler.max_thread_count`::
@ -209,22 +201,3 @@ coming[1.2.0]
The default will change to `1` which works best with spinning-magnets The default will change to `1` which works best with spinning-magnets
disks. If you are using a good solid-state disk (SSD) instead then try disks. If you are using a good solid-state disk (SSD) instead then try
setting this to `3`. setting this to `3`.
[float]
==== SerialMergeScheduler
A merge scheduler that simply does each merge sequentially using the
calling thread (blocking the operations that triggered the merge or the
index operation). This merge scheduler has a merge thread pool that
explicitly schedules merges, and it makes sure that merges are serial
within a shard, yet concurrent across multiple shards.
The scheduler supports the following settings:
`index.merge.scheduler.max_merge_at_once`::
The maximum number of merges a single merge run performs. This setting prevents
executing unlimited amount of merges in a loop until another shards has a
chance to get a merge thread from the pool. If this limit is reached the
merge thread returns to the pool and continues once the the call to a single
shards is executed. The default is `5`

View File

@ -1,168 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.index;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.merge.OnGoingMerge;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
// LUCENE MONITOR - Copied from SerialMergeScheduler
public class TrackingSerialMergeScheduler extends MergeScheduler {
protected final ESLogger logger;
private final MeanMetric totalMerges = new MeanMetric();
private final CounterMetric totalMergesNumDocs = new CounterMetric();
private final CounterMetric totalMergesSizeInBytes = new CounterMetric();
private final CounterMetric currentMerges = new CounterMetric();
private final CounterMetric currentMergesNumDocs = new CounterMetric();
private final CounterMetric currentMergesSizeInBytes = new CounterMetric();
private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
private final int maxMergeAtOnce;
public TrackingSerialMergeScheduler(ESLogger logger, int maxMergeAtOnce) {
this.logger = logger;
this.maxMergeAtOnce = maxMergeAtOnce;
}
public long totalMerges() {
return totalMerges.count();
}
public long totalMergeTime() {
return totalMerges.sum();
}
public long totalMergeNumDocs() {
return totalMergesNumDocs.count();
}
public long totalMergeSizeInBytes() {
return totalMergesSizeInBytes.count();
}
public long currentMerges() {
return currentMerges.count();
}
public long currentMergesNumDocs() {
return currentMergesNumDocs.count();
}
public long currentMergesSizeInBytes() {
return currentMergesSizeInBytes.count();
}
public Set<OnGoingMerge> onGoingMerges() {
return readOnlyOnGoingMerges;
}
/**
* Just do the merges in sequence. We do this
* "synchronized" so that even if the application is using
* multiple threads, only one merge may run at a time.
*/
@Override
synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws CorruptIndexException, IOException {
int cycle = 0;
while (cycle++ < maxMergeAtOnce) {
MergePolicy.OneMerge merge = writer.getNextMerge();
if (merge == null)
break;
// different from serial merge, call mergeInit here so we get the correct stats
// mergeInit can be called several times without side affects (checks on merge.info not being null)
writer.mergeInit(merge);
int totalNumDocs = merge.totalNumDocs();
long totalSizeInBytes = merge.totalBytesSize();
long time = System.currentTimeMillis();
currentMerges.inc();
currentMergesNumDocs.inc(totalNumDocs);
currentMergesSizeInBytes.inc(totalSizeInBytes);
OnGoingMerge onGoingMerge = new OnGoingMerge(merge);
onGoingMerges.add(onGoingMerge);
// sadly, segment name is not available since mergeInit is called from merge itself...
if (logger.isTraceEnabled()) {
logger.trace("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size", merge.info == null ? "_na_" : merge.info.info.name, merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes), new ByteSizeValue(merge.estimatedMergeBytes));
}
try {
beforeMerge(onGoingMerge);
writer.merge(merge);
} finally {
long took = System.currentTimeMillis() - time;
onGoingMerges.remove(onGoingMerge);
afterMerge(onGoingMerge);
currentMerges.dec();
currentMergesNumDocs.dec(totalNumDocs);
currentMergesSizeInBytes.dec(totalSizeInBytes);
totalMergesNumDocs.inc(totalNumDocs);
totalMergesSizeInBytes.inc(totalSizeInBytes);
totalMerges.inc(took);
if (took > 20000) { // if more than 20 seconds, DEBUG log it
logger.debug("merge [{}] done, took [{}]", merge.info == null ? "_na_" : merge.info.info.name, TimeValue.timeValueMillis(took));
} else if (logger.isTraceEnabled()) {
logger.trace("merge [{}] done, took [{}]", merge.info == null ? "_na_" : merge.info.info.name, TimeValue.timeValueMillis(took));
}
}
}
}
/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void beforeMerge(OnGoingMerge merge) {
}
/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void afterMerge(OnGoingMerge merge) {
}
@Override
public void close() {
}
@Override
public MergeScheduler clone() {
// Lucene IW makes a clone internally but since we hold on to this instance
// the clone will just be the identity.
return this;
}
}

View File

@ -1,120 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge.scheduler;
import com.google.common.collect.ImmutableSet;
import org.apache.lucene.index.*;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
*
*/
public class SerialMergeSchedulerProvider extends MergeSchedulerProvider {
private Set<CustomSerialMergeScheduler> schedulers = new CopyOnWriteArraySet<>();
private final int maxMergeAtOnce;
@Inject
public SerialMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
super(shardId, indexSettings, threadPool);
this.maxMergeAtOnce = componentSettings.getAsInt("max_merge_at_once", 5);
logger.trace("using [serial] merge scheduler, max_merge_at_once [{}]", maxMergeAtOnce);
}
@Override
public MergeScheduler buildMergeScheduler() {
CustomSerialMergeScheduler scheduler = new CustomSerialMergeScheduler(logger, this);
schedulers.add(scheduler);
return scheduler;
}
@Override
public MergeStats stats() {
MergeStats mergeStats = new MergeStats();
for (CustomSerialMergeScheduler scheduler : schedulers) {
mergeStats.add(scheduler.totalMerges(), scheduler.totalMergeTime(), scheduler.totalMergeNumDocs(), scheduler.totalMergeSizeInBytes(),
scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes());
}
return mergeStats;
}
@Override
public Set<OnGoingMerge> onGoingMerges() {
for (CustomSerialMergeScheduler scheduler : schedulers) {
return scheduler.onGoingMerges();
}
return ImmutableSet.of();
}
@Override
public void close() {
}
public static class CustomSerialMergeScheduler extends TrackingSerialMergeScheduler {
private final SerialMergeSchedulerProvider provider;
public CustomSerialMergeScheduler(ESLogger logger, SerialMergeSchedulerProvider provider) {
super(logger, provider.maxMergeAtOnce);
this.provider = provider;
}
@Override
public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws CorruptIndexException, IOException {
try {
super.merge(writer, trigger, newMergesFound);
} catch (Throwable e) {
logger.warn("failed to merge", e);
provider.failedMerge(new MergePolicy.MergeException(e, writer.getDirectory()));
throw new MergePolicy.MergeException(e, writer.getDirectory());
}
}
@Override
public void close() {
super.close();
provider.schedulers.remove(this);
}
@Override
protected void beforeMerge(OnGoingMerge merge) {
super.beforeMerge(merge);
provider.beforeMerge(merge);
}
@Override
protected void afterMerge(OnGoingMerge merge) {
super.afterMerge(merge);
provider.afterMerge(merge);
}
}
}

View File

@ -1,73 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.index.TrackingSerialMergeScheduler;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.merge.EnableMergeScheduler;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.junit.Test;
/**
*/
public class TrackingSerialMergeSchedulerTests extends ElasticsearchLuceneTestCase {
@Test
public void testMaxMergeAtOnce() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
// create a tracking merge scheduler, but enabled one, so we can control when it merges
EnableMergeScheduler mergeScheduler = new EnableMergeScheduler(new TrackingSerialMergeScheduler(Loggers.getLogger(getTestClass()), 2));
iwc.setMergeScheduler(mergeScheduler);
TieredMergePolicy mergePolicy = new TieredMergePolicy();
mergePolicy.setMaxMergeAtOnceExplicit(3);
mergePolicy.setMaxMergeAtOnce(3);
iwc.setMergePolicy(mergePolicy);
IndexWriter iw = new IndexWriter(dir, iwc);
// create 20 segments
for (int i = 0; i < 20; i++) {
Document doc = new Document();
doc.add(new StringField("id", Integer.toString(i), Field.Store.NO));
iw.addDocument(doc);
iw.commit(); // create a segment, no merge will happen, its disabled
}
// based on the merge policy maxMerge, and the fact that we allow only for 2 merges to run
// per maybeMerge in our configuration of the serial merge scheduler, the we expect to need
// 4 merge runs to work out through the pending merges
for (int i = 0; i < 4; i++) {
assertTrue(iw.hasPendingMerges());
iw.maybeMerge();
assertTrue(iw.hasPendingMerges());
}
iw.maybeMerge();
assertFalse(iw.hasPendingMerges());
iw.close(false);
dir.close();
}
}

View File

@ -57,7 +57,6 @@ import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
@ -198,12 +197,12 @@ public class InternalEngineTests extends ElasticsearchTestCase {
return new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(new Index("test"), EMPTY_SETTINGS)); return new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(new Index("test"), EMPTY_SETTINGS));
} }
protected MergeSchedulerProvider createMergeScheduler() { protected MergeSchedulerProvider createMergeScheduler(IndexSettingsService indexSettingsService) {
return new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool); return new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService);
} }
protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog) { protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog) {
return createEngine(indexSettingsService, store, translog, createMergeScheduler()); return createEngine(indexSettingsService, store, translog, createMergeScheduler(indexSettingsService));
} }
protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) { protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {

View File

@ -76,7 +76,6 @@ import org.elasticsearch.index.merge.policy.*;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule; import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider;
import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.store.IndicesStore;
@ -433,10 +432,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
if (random.nextBoolean()) { if (random.nextBoolean()) {
builder.put(MergeSchedulerProvider.FORCE_ASYNC_MERGE, random.nextBoolean()); builder.put(MergeSchedulerProvider.FORCE_ASYNC_MERGE, random.nextBoolean());
} }
switch (random.nextInt(5)) { switch (random.nextInt(4)) {
case 4:
builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, SerialMergeSchedulerProvider.class.getName());
break;
case 3: case 3:
builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, ConcurrentMergeSchedulerProvider.class.getName()); builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, ConcurrentMergeSchedulerProvider.class.getName());
break; break;