From b772b513e004369f7c2c7fedea9b8715088e029c Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 20 Oct 2015 15:02:06 +0200 Subject: [PATCH] Refactor ShardFailure listener infrastructure Today we leak the notion of an engine outside of the shard abstraction which is not desirable. This commit refactors the infrastrucutre to use use already existing interfaces to communicate if a shard has failed and prevents engine private classes to be implemented on a higher level. This change is purely cosmentical... --- .../elasticsearch/index/engine/Engine.java | 25 +++--------- .../index/engine/EngineConfig.java | 10 ++--- .../index/engine/InternalEngine.java | 5 --- .../index/engine/ShadowEngine.java | 7 ---- .../elasticsearch/index/shard/IndexShard.java | 38 +++++++++++++------ .../cluster/IndicesClusterStateService.java | 33 +++++----------- .../index/engine/InternalEngineTests.java | 6 +-- .../index/engine/ShadowEngineTests.java | 4 +- 8 files changed, 51 insertions(+), 77 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 1431cbd4f9d..2e970c2c3ec 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -72,7 +72,7 @@ public abstract class Engine implements Closeable { protected final EngineConfig engineConfig; protected final Store store; protected final AtomicBoolean isClosed = new AtomicBoolean(false); - protected final FailedEngineListener failedEngineListener; + protected final EventListener eventListener; protected final SnapshotDeletionPolicy deletionPolicy; protected final ReentrantLock failEngineLock = new ReentrantLock(); protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); @@ -89,7 +89,7 @@ public abstract class Engine implements Closeable { this.store = engineConfig.getStore(); this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name engineConfig.getIndexSettings(), engineConfig.getShardId()); - this.failedEngineListener = engineConfig.getFailedEngineListener(); + this.eventListener = engineConfig.getEventListener(); this.deletionPolicy = engineConfig.getDeletionPolicy(); } @@ -535,7 +535,7 @@ public abstract class Engine implements Closeable { logger.warn("Couldn't mark store corrupted", e); } } - failedEngineListener.onFailedEngine(shardId, reason, failure); + eventListener.onFailedEngine(reason, failure); } } catch (Throwable t) { // don't bubble up these exceptions up @@ -560,19 +560,9 @@ public abstract class Engine implements Closeable { return false; } - /** Wrap a Throwable in an {@code EngineClosedException} if the engine is already closed */ - protected Throwable wrapIfClosed(Throwable t) { - if (isClosed.get()) { - if (t != failedEngine && failedEngine != null) { - t.addSuppressed(failedEngine); - } - return new EngineClosedException(shardId, t); - } - return t; - } - public interface FailedEngineListener { - void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t); + public interface EventListener { + default void onFailedEngine(String reason, @Nullable Throwable t) {} } public static class Searcher implements Releasable { @@ -991,11 +981,6 @@ public abstract class Engine implements Closeable { } } - /** - * Returns true the internal writer has any uncommitted changes. Otherwise false - */ - public abstract boolean hasUncommittedChanges(); - public static class CommitId implements Writeable { private final byte[] id; diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 7ed3d365211..adc04cbee52 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -69,7 +69,7 @@ public final class EngineConfig { private final Analyzer analyzer; private final Similarity similarity; private final CodecService codecService; - private final Engine.FailedEngineListener failedEngineListener; + private final Engine.EventListener eventListener; private final boolean forceNewTranslog; private final QueryCache queryCache; private final QueryCachingPolicy queryCachingPolicy; @@ -117,7 +117,7 @@ public final class EngineConfig { public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService, Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer, - Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener, + Similarity similarity, CodecService codecService, Engine.EventListener eventListener, TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) { this.shardId = shardId; this.indexSettings = indexSettings; @@ -131,7 +131,7 @@ public final class EngineConfig { this.analyzer = analyzer; this.similarity = similarity; this.codecService = codecService; - this.failedEngineListener = failedEngineListener; + this.eventListener = eventListener; this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush); codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME); // We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing: @@ -310,8 +310,8 @@ public final class EngineConfig { /** * Returns a listener that should be called on engine failure */ - public Engine.FailedEngineListener getFailedEngineListener() { - return failedEngineListener; + public Engine.EventListener getEventListener() { + return eventListener; } /** diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 75bcdfa552e..2bae6a9fba8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -846,11 +846,6 @@ public class InternalEngine extends Engine { } } - @Override - public boolean hasUncommittedChanges() { - return indexWriter.hasUncommittedChanges(); - } - @Override protected SearcherManager getSearcherManager() { return searcherManager; diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 921f1167f43..46677d3a55b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -202,8 +202,6 @@ public class ShadowEngine extends Engine { throw new UnsupportedOperationException("Can not take snapshot from a shadow engine"); } - - @Override protected SearcherManager getSearcherManager() { return searcherManager; @@ -223,11 +221,6 @@ public class ShadowEngine extends Engine { } } - @Override - public boolean hasUncommittedChanges() { - return false; - } - @Override protected SegmentInfos getLastCommittedSegmentInfos() { return lastCommittedSegmentInfos; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a6b86e0f59b..6da597e6976 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -41,7 +41,6 @@ import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.support.LoggerMessageFormat; @@ -51,6 +50,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; @@ -76,7 +76,6 @@ import org.elasticsearch.index.mapper.*; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; -import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; @@ -167,7 +166,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett private final MeanMetric refreshMetric = new MeanMetric(); private final MeanMetric flushMetric = new MeanMetric(); - private final ShardEngineFailListener failedEngineListener = new ShardEngineFailListener(); + private final ShardEngineFailListener engineEventListener = new ShardEngineFailListener(); private volatile boolean flushOnClose = true; private volatile int flushThresholdOperations; private volatile ByteSizeValue flushThresholdSize; @@ -979,8 +978,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval"; - public void addFailedEngineListener(Engine.FailedEngineListener failedEngineListener) { - this.failedEngineListener.delegates.add(failedEngineListener); + public void addShardFailureCallback(Callback onShardFailure) { + this.engineEventListener.delegates.add(onShardFailure); } /** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than @@ -1369,15 +1368,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett return this.currentEngineReference.get(); } - class ShardEngineFailListener implements Engine.FailedEngineListener { - private final CopyOnWriteArrayList delegates = new CopyOnWriteArrayList<>(); + class ShardEngineFailListener implements Engine.EventListener { + private final CopyOnWriteArrayList> delegates = new CopyOnWriteArrayList<>(); // called by the current engine @Override - public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) { - for (Engine.FailedEngineListener listener : delegates) { + public void onFailedEngine(String reason, @Nullable Throwable failure) { + final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure); + for (Callback listener : delegates) { try { - listener.onFailedEngine(shardId, reason, failure); + listener.handle(shardFailure); } catch (Exception e) { logger.warn("exception while notifying engine failure", e); } @@ -1457,7 +1457,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett }; return new EngineConfig(shardId, threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, - mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig); + mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, engineEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig); } private static class IndexShardOperationCounter extends AbstractRefCounted { @@ -1571,4 +1571,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett return false; } + /** + * Simple struct encapsulating a shard failure + * @see IndexShard#addShardFailureCallback(Callback) + */ + public static final class ShardFailure { + public final ShardRouting routing; + public final String reason; + public final Throwable cause; + + public ShardFailure(ShardRouting routing, String reason, Throwable cause) { + this.routing = routing; + this.reason = reason; + this.cause = cause; + } + } + } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index b4c6b27c57e..4fdc16012a9 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -41,10 +41,10 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexShardAlreadyExistsException; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettingsService; @@ -637,7 +637,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { @Override - public void onFailedEngine(final ShardId shardId, final String reason, final @Nullable Throwable failure) { - ShardRouting shardRouting = null; - final IndexService indexService = indicesService.indexService(shardId.index().name()); - if (indexService != null) { - IndexShard indexShard = indexService.getShardOrNull(shardId.id()); - if (indexShard != null) { - shardRouting = indexShard.routingEntry(); - } - } - if (shardRouting == null) { - logger.warn("[{}][{}] engine failed, but can't find index shard. failure reason: [{}]", failure, - shardId.index().name(), shardId.id(), reason); - return; - } - final ShardRouting fShardRouting = shardRouting; - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - synchronized (mutex) { - failAndRemoveShard(fShardRouting, indexService, true, "engine failure, reason [" + reason + "]", failure); - } + public void handle(final IndexShard.ShardFailure shardFailure) { + final IndexService indexService = indicesService.indexService(shardFailure.routing.shardId().index().name()); + final ShardRouting shardRouting = shardFailure.routing; + threadPool.generic().execute(() -> { + synchronized (mutex) { + failAndRemoveShard(shardRouting, indexService, true, "engine failure, reason [" + shardFailure.reason + "]", shardFailure.cause); } }); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2a6150267a5..4c0aab8dcea 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -268,9 +268,9 @@ public class InternalEngineTests extends ESTestCase { EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings , null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig, - iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.FailedEngineListener() { + iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.EventListener() { @Override - public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { + public void onFailedEngine(String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test } }, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); @@ -1950,7 +1950,7 @@ public class InternalEngineTests extends ESTestCase { EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings() , null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(), - config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener() + config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getEventListener() , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); try { diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index 7dadafb8a0b..7818a0dc1f3 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -212,9 +212,9 @@ public class ShadowEngineTests extends ESTestCase { TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool); EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings , null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig, - iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() { + iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.EventListener() { @Override - public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { + public void onFailedEngine(String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); try {