diff --git a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java index 01c049c7c80..a8c87cef1b7 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java @@ -482,50 +482,9 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { final BlockingQueue documentMappersToUpdate = ConcurrentCollections.newBlockingQueue(); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference mappingCheckException = new AtomicReference<>(); + // we use immediate as this is a very light weight check and we don't wait to delay recovery - clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new TimeoutClusterStateUpdateTask() { - - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public TimeValue timeout() { - return recoverySettings.internalActionTimeout(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName()); - ImmutableOpenMap metaDataMappings = null; - if (indexMetaData != null) { - metaDataMappings = indexMetaData.getMappings(); - } - // default mapping should not be sent back, it can only be updated by put mapping API, and its - // a full in place replace, we don't want to override a potential update coming it - for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) { - - MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type()); - if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) { - // not on master yet in the right form - documentMappersToUpdate.add(documentMapper); - } - } - return currentState; - } - - @Override - public void onFailure(String source, Throwable t) { - mappingCheckException.set(t); - latch.countDown(); - } - }); + clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new MappingUpdateTask(clusterService, indexService, recoverySettings, latch, documentMappersToUpdate, mappingCheckException, this.cancellableThreads)); cancellableThreads.execute(new Interruptable() { @Override public void run() throws InterruptedException { @@ -659,4 +618,72 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { ", targetNode=" + request.targetNode() + '}'; } + + // this is a static class since we are holding an instance to the IndexShard + // on ShardRecoveryHandler which can not be GCed if the recovery is canceled + // but this task is still stuck in the queue. This can be problematic if the + // queue piles up and recoveries fail and can lead to OOM or memory pressure if lots of shards + // are created and removed. + private static class MappingUpdateTask extends TimeoutClusterStateUpdateTask { + private final CountDownLatch latch; + private final BlockingQueue documentMappersToUpdate; + private final AtomicReference mappingCheckException; + private final CancellableThreads cancellableThreads; + private ClusterService clusterService; + private IndexService indexService; + private RecoverySettings recoverySettings; + + public MappingUpdateTask(ClusterService clusterService, IndexService indexService, RecoverySettings recoverySettings, CountDownLatch latch, BlockingQueue documentMappersToUpdate, AtomicReference mappingCheckException, CancellableThreads cancellableThreads) { + this.latch = latch; + this.documentMappersToUpdate = documentMappersToUpdate; + this.mappingCheckException = mappingCheckException; + this.clusterService = clusterService; + this.indexService = indexService; + this.recoverySettings = recoverySettings; + this.cancellableThreads = cancellableThreads; + } + + @Override + public boolean runOnlyOnMaster() { + return false; + } + + @Override + public TimeValue timeout() { + return recoverySettings.internalActionTimeout(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + if (cancellableThreads.isCancelled() == false) { // no need to run this if recovery is canceled + IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName()); + ImmutableOpenMap metaDataMappings = null; + if (indexMetaData != null) { + metaDataMappings = indexMetaData.getMappings(); + } + // default mapping should not be sent back, it can only be updated by put mapping API, and its + // a full in place replace, we don't want to override a potential update coming into it + for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) { + + MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type()); + if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) { + // not on master yet in the right form + documentMappersToUpdate.add(documentMapper); + } + } + } + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + mappingCheckException.set(t); + latch.countDown(); + } + } }