Better handling when a shard hits OOM, closes #848.

This commit is contained in:
kimchy 2011-04-11 21:10:56 +03:00
parent decb5fa898
commit e8503c1455
6 changed files with 131 additions and 14 deletions

View File

@ -38,6 +38,7 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
/**
@ -55,6 +56,8 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
void updateIndexingBufferSize(ByteSizeValue indexingBufferSize);
void addFailedEngineListener(FailedEngineListener listener);
/**
* Starts the Engine.
*
@ -103,6 +106,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
void recover(RecoveryHandler recoveryHandler) throws EngineException;
static interface FailedEngineListener {
void onFailedEngine(ShardId shardId, Throwable t);
}
/**
* Recovery allow to start the recovery process. It is built of three phases.
*

View File

@ -35,4 +35,8 @@ public class EngineClosedException extends IndexShardClosedException {
public EngineClosedException(ShardId shardId) {
super(shardId);
}
public EngineClosedException(ShardId shardId, Throwable t) {
super(shardId, t);
}
}

View File

@ -57,6 +57,7 @@ import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -128,6 +129,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private final ApplySettings applySettings = new ApplySettings();
private Throwable failedEngine = null;
private final Object failedEngineMutex = new Object();
private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<FailedEngineListener>();
@Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
@ -190,6 +195,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
}
@Override public void addFailedEngineListener(FailedEngineListener listener) {
failedEngineListeners.add(listener);
}
@Override public void start() throws EngineException {
rwl.writeLock().lock();
try {
@ -240,13 +249,16 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
IndexWriter writer = this.indexWriter;
if (writer == null) {
throw new EngineClosedException(shardId);
throw new EngineClosedException(shardId, failedEngine);
}
innerCreate(create, writer);
dirty = true;
possibleMergeNeeded = true;
} catch (IOException e) {
throw new CreateFailedEngineException(shardId, create, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new CreateFailedEngineException(shardId, create, e);
} finally {
rwl.readLock().unlock();
}
@ -340,7 +352,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
IndexWriter writer = this.indexWriter;
if (writer == null) {
throw new EngineClosedException(shardId);
throw new EngineClosedException(shardId, failedEngine);
}
innerIndex(index, writer);
@ -348,6 +360,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
possibleMergeNeeded = true;
} catch (IOException e) {
throw new IndexFailedEngineException(shardId, index, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new IndexFailedEngineException(shardId, index, e);
} finally {
rwl.readLock().unlock();
}
@ -435,13 +450,16 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
IndexWriter writer = this.indexWriter;
if (writer == null) {
throw new EngineClosedException(shardId);
throw new EngineClosedException(shardId, failedEngine);
}
innerDelete(delete, writer);
dirty = true;
possibleMergeNeeded = true;
} catch (IOException e) {
throw new DeleteFailedEngineException(shardId, delete, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new DeleteFailedEngineException(shardId, delete, e);
} finally {
rwl.readLock().unlock();
}
@ -568,7 +586,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// this engine always acts as if waitForOperations=true
IndexWriter currentWriter = indexWriter;
if (currentWriter == null) {
throw new EngineClosedException(shardId);
throw new EngineClosedException(shardId, failedEngine);
}
try {
// we need to obtain a mutex here, to make sure we don't leave dangling readers
@ -591,12 +609,15 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// an index writer got replaced on us, ignore
} catch (Exception e) {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
throw new EngineClosedException(shardId, failedEngine);
} else if (currentWriter != indexWriter) {
// an index writer got replaced on us, ignore
} else {
throw new RefreshFailedEngineException(shardId, e);
}
} catch (OutOfMemoryError e) {
failEngine(e);
throw new RefreshFailedEngineException(shardId, e);
}
} finally {
rwl.readLock().unlock();
@ -605,7 +626,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
@Override public void flush(Flush flush) throws EngineException {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
throw new EngineClosedException(shardId, failedEngine);
}
// check outside the lock as well so we can check without blocking on the write lock
if (disableFlushCounter > 0) {
@ -621,7 +642,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
rwl.writeLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
throw new EngineClosedException(shardId, failedEngine);
}
if (disableFlushCounter > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
@ -643,14 +664,20 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
nrtResource = buildNrtResource(indexWriter);
current.markForClose();
translog.newTranslog(newTransactionLogId());
} catch (IOException e) {
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new FlushFailedEngineException(shardId, e);
}
} else {
try {
indexWriter.commit();
translog.newTranslog(newTransactionLogId());
} catch (IOException e) {
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new FlushFailedEngineException(shardId, e);
}
}
@ -676,7 +703,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
rwl.readLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
throw new EngineClosedException(shardId, failedEngine);
}
if (indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) {
((EnableMergePolicy) indexWriter.getConfig().getMergePolicy()).enableMerge();
@ -684,6 +711,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
indexWriter.maybeMerge();
} catch (Exception e) {
throw new OptimizeFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new OptimizeFailedEngineException(shardId, e);
} finally {
rwl.readLock().unlock();
if (indexWriter != null && indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) {
@ -697,7 +727,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
rwl.readLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
throw new EngineClosedException(shardId, failedEngine);
}
if (indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) {
((EnableMergePolicy) indexWriter.getConfig().getMergePolicy()).enableMerge();
@ -712,6 +742,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
} catch (Exception e) {
throw new OptimizeFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new OptimizeFailedEngineException(shardId, e);
} finally {
if (indexWriter != null && indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) {
((EnableMergePolicy) indexWriter.getConfig().getMergePolicy()).disableMerge();
@ -824,13 +857,36 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
@Override public void close() throws ElasticSearchException {
rwl.writeLock().lock();
try {
innerClose();
} finally {
rwl.writeLock().unlock();
}
}
private void failEngine(Throwable failure) {
synchronized (failedEngineMutex) {
if (failedEngine != null) {
return;
}
logger.warn("failed engine", failure);
failedEngine = failure;
for (FailedEngineListener listener : failedEngineListeners) {
listener.onFailedEngine(shardId, failure);
}
innerClose();
}
}
private void innerClose() {
if (closed) {
return;
}
indexSettingsService.removeListener(applySettings);
closed = true;
rwl.writeLock().lock();
this.versionMap.clear();
this.failedEngineListeners.clear();
try {
if (indexingSearcher.get() != null) {
indexingSearcher.get().release();
@ -847,11 +903,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// ignore
}
}
} catch (IOException e) {
} catch (Exception e) {
logger.debug("failed to rollback writer on close", e);
} finally {
indexWriter = null;
rwl.writeLock().unlock();
}
}

View File

@ -31,6 +31,11 @@ public class IllegalIndexShardStateException extends IndexShardException {
this.currentState = currentState;
}
public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg, Throwable ex) {
super(shardId, "CurrentState[" + currentState + "] ", ex);
this.currentState = currentState;
}
public IndexShardState currentState() {
return currentState;
}

View File

@ -27,4 +27,7 @@ public class IndexShardClosedException extends IllegalIndexShardStateException {
super(shardId, IndexShardState.CLOSED, "Closed");
}
public IndexShardClosedException(ShardId shardId, Throwable t) {
super(shardId, IndexShardState.CLOSED, "Closed", t);
}
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.mapper.DocumentMapper;
@ -49,6 +50,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.recovery.RecoveryFailedException;
import org.elasticsearch.index.shard.recovery.RecoverySource;
import org.elasticsearch.index.shard.recovery.RecoveryTarget;
@ -92,6 +94,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final Object mutex = new Object();
private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
@Inject public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
ThreadPool threadPool, RecoveryTarget recoveryTarget, RecoverySource recoverySource,
ShardStateAction shardStateAction,
@ -405,6 +409,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
InternalIndexShard indexShard = (InternalIndexShard) indexService.createShard(shardId);
indexShard.routingEntry(shardRouting);
indexShard.engine().addFailedEngineListener(failedEngineHandler);
} catch (IndexShardAlreadyExistsException e) {
// ignore this, the method call can happen several times
} catch (Exception e) {
@ -550,4 +555,42 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
}
private class FailedEngineHandler implements Engine.FailedEngineListener {
@Override public void onFailedEngine(final ShardId shardId, final Throwable failure) {
ShardRouting shardRouting = null;
final IndexService indexService = indicesService.indexService(shardId.index().name());
if (indexService != null) {
IndexShard indexShard = indexService.shard(shardId.id());
if (indexShard != null) {
shardRouting = indexShard.routingEntry();
}
}
if (shardRouting == null) {
logger.warn("[{}][{}] engine failed, but can't find index shard", shardId.index().name(), shardId.id());
return;
}
final ShardRouting fShardRouting = shardRouting;
threadPool.cached().execute(new Runnable() {
@Override public void run() {
synchronized (mutex) {
if (indexService.hasShard(shardId.id())) {
try {
indexService.cleanShard(shardId.id(), "engine failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {
logger.warn("[{}][{}] failed to delete shard after failed engine", e1, indexService.index().name(), shardId.id());
}
}
try {
shardStateAction.shardFailed(fShardRouting, "engine failure, message [" + detailedMessage(failure) + "]");
} catch (Exception e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed engine", e1, indexService.index().name(), shardId.id());
}
}
}
});
}
}
}