Merge pull request #14206 from s1monw/refactor_shard_failure_listener

Refactor ShardFailure listener infrastructure
This commit is contained in:
Simon Willnauer 2015-10-21 13:40:56 +02:00
commit 3ba4dfa3a6
8 changed files with 63 additions and 84 deletions

View File

@ -72,7 +72,7 @@ public abstract class Engine implements Closeable {
protected final EngineConfig engineConfig;
protected final Store store;
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
protected final FailedEngineListener failedEngineListener;
protected final EventListener eventListener;
protected final SnapshotDeletionPolicy deletionPolicy;
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
@ -89,7 +89,7 @@ public abstract class Engine implements Closeable {
this.store = engineConfig.getStore();
this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name
engineConfig.getIndexSettings(), engineConfig.getShardId());
this.failedEngineListener = engineConfig.getFailedEngineListener();
this.eventListener = engineConfig.getEventListener();
this.deletionPolicy = engineConfig.getDeletionPolicy();
}
@ -535,7 +535,7 @@ public abstract class Engine implements Closeable {
logger.warn("Couldn't mark store corrupted", e);
}
}
failedEngineListener.onFailedEngine(shardId, reason, failure);
eventListener.onFailedEngine(reason, failure);
}
} catch (Throwable t) {
// don't bubble up these exceptions up
@ -560,19 +560,9 @@ public abstract class Engine implements Closeable {
return false;
}
/** Wrap a Throwable in an {@code EngineClosedException} if the engine is already closed */
protected Throwable wrapIfClosed(Throwable t) {
if (isClosed.get()) {
if (t != failedEngine && failedEngine != null) {
t.addSuppressed(failedEngine);
}
return new EngineClosedException(shardId, t);
}
return t;
}
public interface FailedEngineListener {
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
public interface EventListener {
default void onFailedEngine(String reason, @Nullable Throwable t) {}
}
public static class Searcher implements Releasable {
@ -991,11 +981,6 @@ public abstract class Engine implements Closeable {
}
}
/**
* Returns <code>true</code> the internal writer has any uncommitted changes. Otherwise <code>false</code>
*/
public abstract boolean hasUncommittedChanges();
public static class CommitId implements Writeable {
private final byte[] id;

View File

@ -69,7 +69,7 @@ public final class EngineConfig {
private final Analyzer analyzer;
private final Similarity similarity;
private final CodecService codecService;
private final Engine.FailedEngineListener failedEngineListener;
private final Engine.EventListener eventListener;
private final boolean forceNewTranslog;
private final QueryCache queryCache;
private final QueryCachingPolicy queryCachingPolicy;
@ -117,7 +117,7 @@ public final class EngineConfig {
public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService,
Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) {
this.shardId = shardId;
this.indexSettings = indexSettings;
@ -131,7 +131,7 @@ public final class EngineConfig {
this.analyzer = analyzer;
this.similarity = similarity;
this.codecService = codecService;
this.failedEngineListener = failedEngineListener;
this.eventListener = eventListener;
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
// We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing:
@ -310,8 +310,8 @@ public final class EngineConfig {
/**
* Returns a listener that should be called on engine failure
*/
public Engine.FailedEngineListener getFailedEngineListener() {
return failedEngineListener;
public Engine.EventListener getEventListener() {
return eventListener;
}
/**

View File

@ -846,11 +846,6 @@ public class InternalEngine extends Engine {
}
}
@Override
public boolean hasUncommittedChanges() {
return indexWriter.hasUncommittedChanges();
}
@Override
protected SearcherManager getSearcherManager() {
return searcherManager;

View File

@ -202,8 +202,6 @@ public class ShadowEngine extends Engine {
throw new UnsupportedOperationException("Can not take snapshot from a shadow engine");
}
@Override
protected SearcherManager getSearcherManager() {
return searcherManager;
@ -223,11 +221,6 @@ public class ShadowEngine extends Engine {
}
}
@Override
public boolean hasUncommittedChanges() {
return false;
}
@Override
protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;

View File

@ -41,7 +41,6 @@ import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
@ -51,6 +50,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
@ -76,7 +76,6 @@ import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
@ -167,7 +166,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
private final ShardEngineFailListener failedEngineListener = new ShardEngineFailListener();
private final ShardEventListener shardEventListener = new ShardEventListener();
private volatile boolean flushOnClose = true;
private volatile int flushThresholdOperations;
private volatile ByteSizeValue flushThresholdSize;
@ -980,8 +979,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
public void addFailedEngineListener(Engine.FailedEngineListener failedEngineListener) {
this.failedEngineListener.delegates.add(failedEngineListener);
public void addShardFailureCallback(Callback<ShardFailure> onShardFailure) {
this.shardEventListener.delegates.add(onShardFailure);
}
/** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than
@ -1370,15 +1369,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return this.currentEngineReference.get();
}
class ShardEngineFailListener implements Engine.FailedEngineListener {
private final CopyOnWriteArrayList<Engine.FailedEngineListener> delegates = new CopyOnWriteArrayList<>();
class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Callback<ShardFailure>> delegates = new CopyOnWriteArrayList<>();
// called by the current engine
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
for (Engine.FailedEngineListener listener : delegates) {
public void onFailedEngine(String reason, @Nullable Throwable failure) {
final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure, getIndexUUID());
for (Callback<ShardFailure> listener : delegates) {
try {
listener.onFailedEngine(shardId, reason, failure);
listener.handle(shardFailure);
} catch (Exception e) {
logger.warn("exception while notifying engine failure", e);
}
@ -1458,7 +1458,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
};
return new EngineConfig(shardId,
threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
}
private static class IndexShardOperationCounter extends AbstractRefCounted {
@ -1572,4 +1572,23 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return false;
}
/**
* Simple struct encapsulating a shard failure
* @see IndexShard#addShardFailureCallback(Callback)
*/
public static final class ShardFailure {
public final ShardRouting routing;
public final String reason;
@Nullable
public final Throwable cause;
public final String indexUUID;
public ShardFailure(ShardRouting routing, String reason, @Nullable Throwable cause, String indexUUID) {
this.routing = routing;
this.reason = reason;
this.cause = cause;
this.indexUUID = indexUUID;
}
}
}

View File

@ -41,10 +41,10 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettingsService;
@ -98,7 +98,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private final Object mutex = new Object();
private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
private final FailedShardHandler failedShardHandler = new FailedShardHandler();
private final boolean sendRefreshMapping;
@ -381,7 +381,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// so this failure typically means wrong node level configuration or something similar
for (IndexShard indexShard : indexService) {
ShardRouting shardRouting = indexShard.routingEntry();
failAndRemoveShard(shardRouting, indexService, true, "failed to update mappings", t);
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to update mappings", t);
}
}
}
@ -637,11 +637,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
IndexShard indexShard = indexService.createShard(shardId, shardRouting);
indexShard.updateRoutingEntry(shardRouting, state.blocks().disableStatePersistence() == false);
indexShard.addFailedEngineListener(failedEngineHandler);
indexShard.addShardFailureCallback(failedShardHandler);
} catch (IndexShardAlreadyExistsException e) {
// ignore this, the method call can happen several times
} catch (Throwable e) {
failAndRemoveShard(shardRouting, indexService, true, "failed to create shard", e);
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to create shard", e);
return;
}
}
@ -768,7 +768,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
synchronized (mutex) {
failAndRemoveShard(shardRouting, indexService, sendShardFailure, "failed recovery", failure);
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, sendShardFailure, "failed recovery", failure);
}
}
@ -802,8 +802,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void failAndRemoveShard(ShardRouting shardRouting, IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
if (indexService.hasShard(shardRouting.getId())) {
private void failAndRemoveShard(ShardRouting shardRouting, String indexUUID, @Nullable IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
if (indexService != null && indexService.hasShard(shardRouting.getId())) {
// if the indexService is null we can't remove the shard, that's fine since we might have a failure
// when the index is remove and then we already removed the index service for that shard...
try {
indexService.removeShard(shardRouting.getId(), message);
} catch (ShardNotFoundException e) {
@ -813,7 +815,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
if (sendShardFailure) {
sendFailShard(shardRouting, indexService.indexUUID(), message, failure);
sendFailShard(shardRouting, indexUUID, message, failure);
}
}
@ -827,29 +829,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
private class FailedEngineHandler implements Engine.FailedEngineListener {
private class FailedShardHandler implements Callback<IndexShard.ShardFailure> {
@Override
public void onFailedEngine(final ShardId shardId, final String reason, final @Nullable Throwable failure) {
ShardRouting shardRouting = null;
final IndexService indexService = indicesService.indexService(shardId.index().name());
if (indexService != null) {
IndexShard indexShard = indexService.getShardOrNull(shardId.id());
if (indexShard != null) {
shardRouting = indexShard.routingEntry();
}
}
if (shardRouting == null) {
logger.warn("[{}][{}] engine failed, but can't find index shard. failure reason: [{}]", failure,
shardId.index().name(), shardId.id(), reason);
return;
}
final ShardRouting fShardRouting = shardRouting;
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
failAndRemoveShard(fShardRouting, indexService, true, "engine failure, reason [" + reason + "]", failure);
}
public void handle(final IndexShard.ShardFailure shardFailure) {
final IndexService indexService = indicesService.indexService(shardFailure.routing.shardId().index().name());
final ShardRouting shardRouting = shardFailure.routing;
threadPool.generic().execute(() -> {
synchronized (mutex) {
failAndRemoveShard(shardRouting, shardFailure.indexUUID, indexService, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause);
}
});
}

View File

@ -269,9 +269,9 @@ public class InternalEngineTests extends ESTestCase {
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.FailedEngineListener() {
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.EventListener() {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
public void onFailedEngine(String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}
}, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
@ -1919,7 +1919,7 @@ public class InternalEngineTests extends ESTestCase {
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener()
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getEventListener()
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
try {

View File

@ -222,9 +222,9 @@ public class ShadowEngineTests extends ESTestCase {
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() {
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.EventListener() {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
public void onFailedEngine(String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
try {