fail a shard if a merge failure occurs

This commit is contained in:
Shay Banon 2013-02-27 23:44:55 +01:00
parent e908c723f1
commit 7400c30eba
6 changed files with 79 additions and 19 deletions

View File

@ -139,6 +139,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private final ApplySettings applySettings = new ApplySettings();
private volatile boolean failOnMergeFailure;
private Throwable failedEngine = null;
private final Object failedEngineMutex = new Object();
private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<FailedEngineListener>();
@ -185,6 +186,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
this.indexSettingsService.addListener(applySettings);
this.failOnMergeFailure = indexSettings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, true);
if (failOnMergeFailure) {
this.mergeScheduler.addFailureListener(new FailEngineOnMergeFailure());
}
}
@Override
@ -1225,6 +1231,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
}
class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
@Override
public void onFailedMerge(MergePolicy.MergeException e) {
failEngine(e);
}
}
private void failEngine(Throwable failure) {
synchronized (failedEngineMutex) {
if (failedEngine != null) {
@ -1331,6 +1344,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
public static final String INDEX_INDEX_CONCURRENCY = "index.index_concurrency";
public static final String INDEX_GC_DELETES = "index.gc_deletes";
public static final String INDEX_CODEC = "index.codec";
public static final String INDEX_FAIL_ON_MERGE_FAILURE = "index.fail_on_merge_failure";
class ApplySettings implements IndexSettingsService.Listener {
@Override
@ -1344,9 +1358,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
int termIndexInterval = settings.getAsInt(INDEX_TERM_INDEX_INTERVAL, RobinEngine.this.termIndexInterval);
int termIndexDivisor = settings.getAsInt(INDEX_TERM_INDEX_DIVISOR, RobinEngine.this.termIndexDivisor); // IndexReader#DEFAULT_TERMS_INDEX_DIVISOR
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, RobinEngine.this.indexConcurrency);
boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, RobinEngine.this.failOnMergeFailure);
String codecName = settings.get(INDEX_CODEC, RobinEngine.this.codecName);
boolean requiresFlushing = false;
if (termIndexInterval != RobinEngine.this.termIndexInterval || termIndexDivisor != RobinEngine.this.termIndexDivisor || indexConcurrency != RobinEngine.this.indexConcurrency || !codecName.equals(RobinEngine.this.codecName)) {
if (termIndexInterval != RobinEngine.this.termIndexInterval || termIndexDivisor != RobinEngine.this.termIndexDivisor || indexConcurrency != RobinEngine.this.indexConcurrency || !codecName.equals(RobinEngine.this.codecName) || failOnMergeFailure != RobinEngine.this.failOnMergeFailure) {
rwl.readLock().lock();
try {
if (termIndexInterval != RobinEngine.this.termIndexInterval) {
@ -1373,6 +1388,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// we want to flush in this case, so the new codec will be reflected right away...
requiresFlushing = true;
}
if (failOnMergeFailure != RobinEngine.this.failOnMergeFailure) {
logger.info("updating {} from [{}] to [{}]", RobinEngine.INDEX_FAIL_ON_MERGE_FAILURE, RobinEngine.this.failOnMergeFailure, failOnMergeFailure);
RobinEngine.this.failOnMergeFailure = failOnMergeFailure;
}
} finally {
rwl.readLock().unlock();
}

View File

@ -29,8 +29,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Set;
@ -39,7 +39,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
/**
*
*/
public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponent implements MergeSchedulerProvider {
public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
private final int maxThreadCount;
private final int maxMergeCount;
@ -47,8 +47,8 @@ public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponen
private Set<CustomConcurrentMergeScheduler> schedulers = new CopyOnWriteArraySet<CustomConcurrentMergeScheduler>();
@Inject
public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
super(shardId, indexSettings, threadPool);
// TODO LUCENE MONITOR this will change in Lucene 4.0
this.maxThreadCount = componentSettings.getAsInt("max_thread_count", Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2)));
@ -97,6 +97,7 @@ public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponen
@Override
protected void handleMergeException(Throwable exc) {
logger.warn("failed to merge", exc);
provider.failedMerge(new MergePolicy.MergeException(exc, dir));
super.handleMergeException(exc);
}

View File

@ -19,16 +19,57 @@
package org.elasticsearch.index.merge.scheduler;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*
*/
public interface MergeSchedulerProvider<T extends MergeScheduler> extends IndexShardComponent {
public abstract class MergeSchedulerProvider<T extends MergeScheduler> extends AbstractIndexShardComponent implements IndexShardComponent {
T newMergeScheduler();
public static interface FailureListener {
void onFailedMerge(MergePolicy.MergeException e);
}
MergeStats stats();
private final ThreadPool threadPool;
private final CopyOnWriteArrayList<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
private final boolean notifyOnMergeFailure;
protected MergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.notifyOnMergeFailure = componentSettings.getAsBoolean("notify_on_failure", true);
}
public void addFailureListener(FailureListener listener) {
failureListeners.add(listener);
}
protected void failedMerge(final MergePolicy.MergeException e) {
if (!notifyOnMergeFailure) {
return;
}
for (final FailureListener failureListener : failureListeners) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
failureListener.onFailedMerge(e);
}
});
}
}
public abstract T newMergeScheduler();
public abstract MergeStats stats();
}

View File

@ -19,17 +19,14 @@
package org.elasticsearch.index.merge.scheduler;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.TrackingSerialMergeScheduler;
import org.apache.lucene.index.*;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Set;
@ -38,13 +35,13 @@ import java.util.concurrent.CopyOnWriteArraySet;
/**
*
*/
public class SerialMergeSchedulerProvider extends AbstractIndexShardComponent implements MergeSchedulerProvider {
public class SerialMergeSchedulerProvider extends MergeSchedulerProvider {
private Set<CustomSerialMergeScheduler> schedulers = new CopyOnWriteArraySet<CustomSerialMergeScheduler>();
@Inject
public SerialMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
public SerialMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
super(shardId, indexSettings, threadPool);
logger.trace("using [serial] merge scheduler");
}
@ -78,9 +75,10 @@ public class SerialMergeSchedulerProvider extends AbstractIndexShardComponent im
public void merge(IndexWriter writer) throws CorruptIndexException, IOException {
try {
super.merge(writer);
} catch (IOException e) {
} catch (Throwable e) {
logger.warn("failed to merge", e);
throw e;
provider.failedMerge(new MergePolicy.MergeException(e, writer.getDirectory()));
throw new MergePolicy.MergeException(e, writer.getDirectory());
}
}

View File

@ -79,6 +79,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
RobinEngine.INDEX_INDEX_CONCURRENCY,
RobinEngine.INDEX_GC_DELETES,
RobinEngine.INDEX_CODEC,
RobinEngine.INDEX_FAIL_ON_MERGE_FAILURE,
ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN,
ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO,
ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG,

View File

@ -161,7 +161,7 @@ public abstract class AbstractSimpleEngineTests {
}
protected MergeSchedulerProvider createMergeScheduler() {
return new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS);
return new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool);
}
protected abstract Engine createEngine(Store store, Translog translog);