Mapping update task back references already closed index shard

In the ShardRecoveryHandler we issue cluster update tasks to update the
mapping. The annonymous inner class backreferences the ShardRecoveryHandler
which holds a potentially large IndexShard object (which references buffers & caches etc)
If the queue of update tasks piles up and recoveries get cancled and/or shards are closed
the ShardRecoveryHandler can't be GCed. This commit moves the update task into a static
inner class to allos the GC to do its job.
This commit is contained in:
Simon Willnauer 2015-02-08 22:53:53 +01:00
parent 1167beed48
commit b3b1a11a64
1 changed files with 70 additions and 43 deletions

View File

@ -482,50 +482,9 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
final BlockingQueue<DocumentMapper> documentMappersToUpdate = ConcurrentCollections.newBlockingQueue();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> mappingCheckException = new AtomicReference<>();
// we use immediate as this is a very light weight check and we don't wait to delay recovery
clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new TimeoutClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public TimeValue timeout() {
return recoverySettings.internalActionTimeout();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
}
// default mapping should not be sent back, it can only be updated by put mapping API, and its
// a full in place replace, we don't want to override a potential update coming it
for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) {
MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
documentMappersToUpdate.add(documentMapper);
}
}
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
mappingCheckException.set(t);
latch.countDown();
}
});
clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new MappingUpdateTask(clusterService, indexService, recoverySettings, latch, documentMappersToUpdate, mappingCheckException, this.cancellableThreads));
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
@ -659,4 +618,72 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
", targetNode=" + request.targetNode() +
'}';
}
// this is a static class since we are holding an instance to the IndexShard
// on ShardRecoveryHandler which can not be GCed if the recovery is canceled
// but this task is still stuck in the queue. This can be problematic if the
// queue piles up and recoveries fail and can lead to OOM or memory pressure if lots of shards
// are created and removed.
private static class MappingUpdateTask extends TimeoutClusterStateUpdateTask {
private final CountDownLatch latch;
private final BlockingQueue<DocumentMapper> documentMappersToUpdate;
private final AtomicReference<Throwable> mappingCheckException;
private final CancellableThreads cancellableThreads;
private ClusterService clusterService;
private IndexService indexService;
private RecoverySettings recoverySettings;
public MappingUpdateTask(ClusterService clusterService, IndexService indexService, RecoverySettings recoverySettings, CountDownLatch latch, BlockingQueue<DocumentMapper> documentMappersToUpdate, AtomicReference<Throwable> mappingCheckException, CancellableThreads cancellableThreads) {
this.latch = latch;
this.documentMappersToUpdate = documentMappersToUpdate;
this.mappingCheckException = mappingCheckException;
this.clusterService = clusterService;
this.indexService = indexService;
this.recoverySettings = recoverySettings;
this.cancellableThreads = cancellableThreads;
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public TimeValue timeout() {
return recoverySettings.internalActionTimeout();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (cancellableThreads.isCancelled() == false) { // no need to run this if recovery is canceled
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
}
// default mapping should not be sent back, it can only be updated by put mapping API, and its
// a full in place replace, we don't want to override a potential update coming into it
for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) {
MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
documentMappersToUpdate.add(documentMapper);
}
}
}
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
mappingCheckException.set(t);
latch.countDown();
}
}
}