diff --git a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 006665d38d8..fd28ac7b86f 100644 --- a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -139,6 +139,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private final ApplySettings applySettings = new ApplySettings(); + private volatile boolean failOnMergeFailure; private Throwable failedEngine = null; private final Object failedEngineMutex = new Object(); private final CopyOnWriteArrayList failedEngineListeners = new CopyOnWriteArrayList(); @@ -185,6 +186,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } this.indexSettingsService.addListener(applySettings); + + this.failOnMergeFailure = indexSettings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, true); + if (failOnMergeFailure) { + this.mergeScheduler.addFailureListener(new FailEngineOnMergeFailure()); + } } @Override @@ -1225,6 +1231,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } + class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener { + @Override + public void onFailedMerge(MergePolicy.MergeException e) { + failEngine(e); + } + } + private void failEngine(Throwable failure) { synchronized (failedEngineMutex) { if (failedEngine != null) { @@ -1331,6 +1344,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { public static final String INDEX_INDEX_CONCURRENCY = "index.index_concurrency"; public static final String INDEX_GC_DELETES = "index.gc_deletes"; public static final String INDEX_CODEC = "index.codec"; + public static final String INDEX_FAIL_ON_MERGE_FAILURE = "index.fail_on_merge_failure"; class ApplySettings implements IndexSettingsService.Listener { @Override @@ -1344,9 +1358,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { int termIndexInterval = settings.getAsInt(INDEX_TERM_INDEX_INTERVAL, RobinEngine.this.termIndexInterval); int termIndexDivisor = settings.getAsInt(INDEX_TERM_INDEX_DIVISOR, RobinEngine.this.termIndexDivisor); // IndexReader#DEFAULT_TERMS_INDEX_DIVISOR int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, RobinEngine.this.indexConcurrency); + boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, RobinEngine.this.failOnMergeFailure); String codecName = settings.get(INDEX_CODEC, RobinEngine.this.codecName); boolean requiresFlushing = false; - if (termIndexInterval != RobinEngine.this.termIndexInterval || termIndexDivisor != RobinEngine.this.termIndexDivisor || indexConcurrency != RobinEngine.this.indexConcurrency || !codecName.equals(RobinEngine.this.codecName)) { + if (termIndexInterval != RobinEngine.this.termIndexInterval || termIndexDivisor != RobinEngine.this.termIndexDivisor || indexConcurrency != RobinEngine.this.indexConcurrency || !codecName.equals(RobinEngine.this.codecName) || failOnMergeFailure != RobinEngine.this.failOnMergeFailure) { rwl.readLock().lock(); try { if (termIndexInterval != RobinEngine.this.termIndexInterval) { @@ -1373,6 +1388,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { // we want to flush in this case, so the new codec will be reflected right away... requiresFlushing = true; } + if (failOnMergeFailure != RobinEngine.this.failOnMergeFailure) { + logger.info("updating {} from [{}] to [{}]", RobinEngine.INDEX_FAIL_ON_MERGE_FAILURE, RobinEngine.this.failOnMergeFailure, failOnMergeFailure); + RobinEngine.this.failOnMergeFailure = failOnMergeFailure; + } } finally { rwl.readLock().unlock(); } diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java index a5a2013bdad..9c54eca6ac0 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java @@ -29,8 +29,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Set; @@ -39,7 +39,7 @@ import java.util.concurrent.CopyOnWriteArraySet; /** * */ -public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponent implements MergeSchedulerProvider { +public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { private final int maxThreadCount; private final int maxMergeCount; @@ -47,8 +47,8 @@ public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponen private Set schedulers = new CopyOnWriteArraySet(); @Inject - public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings) { - super(shardId, indexSettings); + public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) { + super(shardId, indexSettings, threadPool); // TODO LUCENE MONITOR this will change in Lucene 4.0 this.maxThreadCount = componentSettings.getAsInt("max_thread_count", Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2))); @@ -97,6 +97,7 @@ public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponen @Override protected void handleMergeException(Throwable exc) { logger.warn("failed to merge", exc); + provider.failedMerge(new MergePolicy.MergeException(exc, dir)); super.handleMergeException(exc); } diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java index 2d03278029d..744954b41fa 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java @@ -19,16 +19,57 @@ package org.elasticsearch.index.merge.scheduler; +import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.MergeScheduler; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShardComponent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.CopyOnWriteArrayList; /** * */ -public interface MergeSchedulerProvider extends IndexShardComponent { +public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent { - T newMergeScheduler(); + public static interface FailureListener { + void onFailedMerge(MergePolicy.MergeException e); + } - MergeStats stats(); + private final ThreadPool threadPool; + private final CopyOnWriteArrayList failureListeners = new CopyOnWriteArrayList(); + + private final boolean notifyOnMergeFailure; + + protected MergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) { + super(shardId, indexSettings); + this.threadPool = threadPool; + this.notifyOnMergeFailure = componentSettings.getAsBoolean("notify_on_failure", true); + } + + public void addFailureListener(FailureListener listener) { + failureListeners.add(listener); + } + + protected void failedMerge(final MergePolicy.MergeException e) { + if (!notifyOnMergeFailure) { + return; + } + for (final FailureListener failureListener : failureListeners) { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + failureListener.onFailedMerge(e); + } + }); + } + } + + public abstract T newMergeScheduler(); + + public abstract MergeStats stats(); } diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java index 87f0762be55..bc3ceccebc8 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java @@ -19,17 +19,14 @@ package org.elasticsearch.index.merge.scheduler; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.MergeScheduler; -import org.apache.lucene.index.TrackingSerialMergeScheduler; +import org.apache.lucene.index.*; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Set; @@ -38,13 +35,13 @@ import java.util.concurrent.CopyOnWriteArraySet; /** * */ -public class SerialMergeSchedulerProvider extends AbstractIndexShardComponent implements MergeSchedulerProvider { +public class SerialMergeSchedulerProvider extends MergeSchedulerProvider { private Set schedulers = new CopyOnWriteArraySet(); @Inject - public SerialMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings) { - super(shardId, indexSettings); + public SerialMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) { + super(shardId, indexSettings, threadPool); logger.trace("using [serial] merge scheduler"); } @@ -78,9 +75,10 @@ public class SerialMergeSchedulerProvider extends AbstractIndexShardComponent im public void merge(IndexWriter writer) throws CorruptIndexException, IOException { try { super.merge(writer); - } catch (IOException e) { + } catch (Throwable e) { logger.warn("failed to merge", e); - throw e; + provider.failedMerge(new MergePolicy.MergeException(e, writer.getDirectory())); + throw new MergePolicy.MergeException(e, writer.getDirectory()); } } diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index e6966871481..017f6ed0a71 100644 --- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -79,6 +79,7 @@ public class IndexDynamicSettingsModule extends AbstractModule { RobinEngine.INDEX_INDEX_CONCURRENCY, RobinEngine.INDEX_GC_DELETES, RobinEngine.INDEX_CODEC, + RobinEngine.INDEX_FAIL_ON_MERGE_FAILURE, ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, diff --git a/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java b/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java index 0b9968a9dca..cc2dee36d7b 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java @@ -161,7 +161,7 @@ public abstract class AbstractSimpleEngineTests { } protected MergeSchedulerProvider createMergeScheduler() { - return new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS); + return new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool); } protected abstract Engine createEngine(Store store, Translog translog);