diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java index 89f480e7ff0..dfcde3f0dd3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.shard.IndexShardComponent; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; /** @@ -55,6 +56,8 @@ public interface Engine extends IndexShardComponent, CloseableComponent { void updateIndexingBufferSize(ByteSizeValue indexingBufferSize); + void addFailedEngineListener(FailedEngineListener listener); + /** * Starts the Engine. * @@ -103,6 +106,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent { void recover(RecoveryHandler recoveryHandler) throws EngineException; + static interface FailedEngineListener { + void onFailedEngine(ShardId shardId, Throwable t); + } + /** * Recovery allow to start the recovery process. It is built of three phases. * diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/EngineClosedException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/EngineClosedException.java index a47cbb6660e..95c61770839 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/EngineClosedException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/EngineClosedException.java @@ -35,4 +35,8 @@ public class EngineClosedException extends IndexShardClosedException { public EngineClosedException(ShardId shardId) { super(shardId); } + + public EngineClosedException(ShardId shardId, Throwable t) { + super(shardId, t); + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index cde44207915..0962fbb716c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -57,6 +57,7 @@ import org.elasticsearch.index.translog.Translog; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -128,6 +129,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private final ApplySettings applySettings = new ApplySettings(); + private Throwable failedEngine = null; + private final Object failedEngineMutex = new Object(); + private final CopyOnWriteArrayList failedEngineListeners = new CopyOnWriteArrayList(); + @Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, @@ -190,6 +195,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } + @Override public void addFailedEngineListener(FailedEngineListener listener) { + failedEngineListeners.add(listener); + } + @Override public void start() throws EngineException { rwl.writeLock().lock(); try { @@ -240,13 +249,16 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { try { IndexWriter writer = this.indexWriter; if (writer == null) { - throw new EngineClosedException(shardId); + throw new EngineClosedException(shardId, failedEngine); } innerCreate(create, writer); dirty = true; possibleMergeNeeded = true; } catch (IOException e) { throw new CreateFailedEngineException(shardId, create, e); + } catch (OutOfMemoryError e) { + failEngine(e); + throw new CreateFailedEngineException(shardId, create, e); } finally { rwl.readLock().unlock(); } @@ -340,7 +352,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { try { IndexWriter writer = this.indexWriter; if (writer == null) { - throw new EngineClosedException(shardId); + throw new EngineClosedException(shardId, failedEngine); } innerIndex(index, writer); @@ -348,6 +360,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { possibleMergeNeeded = true; } catch (IOException e) { throw new IndexFailedEngineException(shardId, index, e); + } catch (OutOfMemoryError e) { + failEngine(e); + throw new IndexFailedEngineException(shardId, index, e); } finally { rwl.readLock().unlock(); } @@ -435,13 +450,16 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { try { IndexWriter writer = this.indexWriter; if (writer == null) { - throw new EngineClosedException(shardId); + throw new EngineClosedException(shardId, failedEngine); } innerDelete(delete, writer); dirty = true; possibleMergeNeeded = true; } catch (IOException e) { throw new DeleteFailedEngineException(shardId, delete, e); + } catch (OutOfMemoryError e) { + failEngine(e); + throw new DeleteFailedEngineException(shardId, delete, e); } finally { rwl.readLock().unlock(); } @@ -568,7 +586,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { // this engine always acts as if waitForOperations=true IndexWriter currentWriter = indexWriter; if (currentWriter == null) { - throw new EngineClosedException(shardId); + throw new EngineClosedException(shardId, failedEngine); } try { // we need to obtain a mutex here, to make sure we don't leave dangling readers @@ -591,12 +609,15 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { // an index writer got replaced on us, ignore } catch (Exception e) { if (indexWriter == null) { - throw new EngineClosedException(shardId); + throw new EngineClosedException(shardId, failedEngine); } else if (currentWriter != indexWriter) { // an index writer got replaced on us, ignore } else { throw new RefreshFailedEngineException(shardId, e); } + } catch (OutOfMemoryError e) { + failEngine(e); + throw new RefreshFailedEngineException(shardId, e); } } finally { rwl.readLock().unlock(); @@ -605,7 +626,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { @Override public void flush(Flush flush) throws EngineException { if (indexWriter == null) { - throw new EngineClosedException(shardId); + throw new EngineClosedException(shardId, failedEngine); } // check outside the lock as well so we can check without blocking on the write lock if (disableFlushCounter > 0) { @@ -621,7 +642,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { rwl.writeLock().lock(); try { if (indexWriter == null) { - throw new EngineClosedException(shardId); + throw new EngineClosedException(shardId, failedEngine); } if (disableFlushCounter > 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); @@ -643,14 +664,20 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { nrtResource = buildNrtResource(indexWriter); current.markForClose(); translog.newTranslog(newTransactionLogId()); - } catch (IOException e) { + } catch (Exception e) { + throw new FlushFailedEngineException(shardId, e); + } catch (OutOfMemoryError e) { + failEngine(e); throw new FlushFailedEngineException(shardId, e); } } else { try { indexWriter.commit(); translog.newTranslog(newTransactionLogId()); - } catch (IOException e) { + } catch (Exception e) { + throw new FlushFailedEngineException(shardId, e); + } catch (OutOfMemoryError e) { + failEngine(e); throw new FlushFailedEngineException(shardId, e); } } @@ -676,7 +703,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { rwl.readLock().lock(); try { if (indexWriter == null) { - throw new EngineClosedException(shardId); + throw new EngineClosedException(shardId, failedEngine); } if (indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) { ((EnableMergePolicy) indexWriter.getConfig().getMergePolicy()).enableMerge(); @@ -684,6 +711,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { indexWriter.maybeMerge(); } catch (Exception e) { throw new OptimizeFailedEngineException(shardId, e); + } catch (OutOfMemoryError e) { + failEngine(e); + throw new OptimizeFailedEngineException(shardId, e); } finally { rwl.readLock().unlock(); if (indexWriter != null && indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) { @@ -697,7 +727,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { rwl.readLock().lock(); try { if (indexWriter == null) { - throw new EngineClosedException(shardId); + throw new EngineClosedException(shardId, failedEngine); } if (indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) { ((EnableMergePolicy) indexWriter.getConfig().getMergePolicy()).enableMerge(); @@ -712,6 +742,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } catch (Exception e) { throw new OptimizeFailedEngineException(shardId, e); + } catch (OutOfMemoryError e) { + failEngine(e); + throw new OptimizeFailedEngineException(shardId, e); } finally { if (indexWriter != null && indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) { ((EnableMergePolicy) indexWriter.getConfig().getMergePolicy()).disableMerge(); @@ -824,13 +857,36 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } @Override public void close() throws ElasticSearchException { + rwl.writeLock().lock(); + try { + innerClose(); + } finally { + rwl.writeLock().unlock(); + } + } + + private void failEngine(Throwable failure) { + synchronized (failedEngineMutex) { + if (failedEngine != null) { + return; + } + logger.warn("failed engine", failure); + failedEngine = failure; + for (FailedEngineListener listener : failedEngineListeners) { + listener.onFailedEngine(shardId, failure); + } + innerClose(); + } + } + + private void innerClose() { if (closed) { return; } indexSettingsService.removeListener(applySettings); closed = true; - rwl.writeLock().lock(); this.versionMap.clear(); + this.failedEngineListeners.clear(); try { if (indexingSearcher.get() != null) { indexingSearcher.get().release(); @@ -847,11 +903,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { // ignore } } - } catch (IOException e) { + } catch (Exception e) { logger.debug("failed to rollback writer on close", e); } finally { indexWriter = null; - rwl.writeLock().unlock(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java index 11f7807f1aa..9f899029985 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java @@ -31,6 +31,11 @@ public class IllegalIndexShardStateException extends IndexShardException { this.currentState = currentState; } + public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg, Throwable ex) { + super(shardId, "CurrentState[" + currentState + "] ", ex); + this.currentState = currentState; + } + public IndexShardState currentState() { return currentState; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardClosedException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardClosedException.java index e5b1f768131..b51126ee4f1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardClosedException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardClosedException.java @@ -27,4 +27,7 @@ public class IndexShardClosedException extends IllegalIndexShardStateException { super(shardId, IndexShardState.CLOSED, "Closed"); } + public IndexShardClosedException(ShardId shardId, Throwable t) { + super(shardId, IndexShardState.CLOSED, "Closed", t); + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index c5b35ffc5c5..a874a442c59 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.IndexShardAlreadyExistsException; import org.elasticsearch.index.IndexShardMissingException; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; import org.elasticsearch.index.gateway.IndexShardGatewayService; import org.elasticsearch.index.mapper.DocumentMapper; @@ -49,6 +50,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.recovery.RecoveryFailedException; import org.elasticsearch.index.shard.recovery.RecoverySource; import org.elasticsearch.index.shard.recovery.RecoveryTarget; @@ -92,6 +94,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent