diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index 73c57e5ef7e..80b319d5b7a 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -21,7 +21,6 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -40,7 +39,12 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportService; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * The source recovery accepts recovery requests from other peer shards and start the recovery process from this @@ -55,7 +59,6 @@ public class RecoverySource extends AbstractComponent { private final TransportService transportService; private final IndicesService indicesService; private final RecoverySettings recoverySettings; - private final MappingUpdatedAction mappingUpdatedAction; private final ClusterService clusterService; @@ -64,11 +67,10 @@ public class RecoverySource extends AbstractComponent { @Inject public RecoverySource(Settings settings, TransportService transportService, IndicesService indicesService, - RecoverySettings recoverySettings, MappingUpdatedAction mappingUpdatedAction, ClusterService clusterService) { + RecoverySettings recoverySettings, ClusterService clusterService) { super(settings); this.transportService = transportService; this.indicesService = indicesService; - this.mappingUpdatedAction = mappingUpdatedAction; this.clusterService = clusterService; this.indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() { @Override @@ -116,9 +118,9 @@ public class RecoverySource extends AbstractComponent { logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated()); final RecoverySourceHandler handler; if (IndexMetaData.isOnSharedFilesystem(shard.indexSettings())) { - handler = new SharedFSRecoverySourceHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger); + handler = new SharedFSRecoverySourceHandler(shard, request, recoverySettings, transportService, logger); } else { - handler = new RecoverySourceHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger); + handler = new RecoverySourceHandler(shard, request, recoverySettings, transportService, logger); } ongoingRecoveries.add(shard, handler); try { diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index d9ebe4798ee..823742404e3 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; + import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; @@ -30,31 +31,19 @@ import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; -import org.elasticsearch.cluster.action.index.MappingUpdatedAction; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.CancellableThreads.Interruptable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; -import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -62,7 +51,6 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportRequestOptions; @@ -71,7 +59,9 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Comparator; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -91,9 +81,6 @@ public class RecoverySourceHandler { private final StartRecoveryRequest request; private final RecoverySettings recoverySettings; private final TransportService transportService; - private final ClusterService clusterService; - private final IndexService indexService; - private final MappingUpdatedAction mappingUpdatedAction; protected final RecoveryResponse response; private final CancellableThreads cancellableThreads = new CancellableThreads() { @@ -114,18 +101,14 @@ public class RecoverySourceHandler { public RecoverySourceHandler(final IndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings, - final TransportService transportService, final ClusterService clusterService, - final IndicesService indicesService, final MappingUpdatedAction mappingUpdatedAction, final ESLogger logger) { + final TransportService transportService, final ESLogger logger) { this.shard = shard; this.request = request; this.recoverySettings = recoverySettings; this.logger = logger; this.transportService = transportService; - this.clusterService = clusterService; this.indexName = this.request.shardId().index().name(); this.shardId = this.request.shardId().id(); - this.indexService = indicesService.indexServiceSafe(indexName); - this.mappingUpdatedAction = mappingUpdatedAction; this.response = new RecoveryResponse(); } @@ -490,9 +473,6 @@ public class RecoverySourceHandler { cancellableThreads.checkForCancel(); StopWatch stopWatch = new StopWatch().start(); - logger.trace("{} recovery [phase2] to {}: updating current mapping to master", request.shardId(), request.targetNode()); - // Ensure that the mappings are synced with the master node - updateMappingOnMaster(); logger.trace("{} recovery [phase2] to {}: sending transaction log operations", request.shardId(), request.targetNode()); // Send all the snapshot's translog operations to the target @@ -546,67 +526,6 @@ public class RecoverySourceHandler { indexName, shardId, request.targetNode(), stopWatch.totalTime()); } - /** - * Ensures that the mapping in the cluster state is the same as the mapping - * in our mapper service. If the mapping is not in sync, sends a request - * to update it in the cluster state and blocks until it has finished - * being updated. - */ - private void updateMappingOnMaster() { - // we test that the cluster state is in sync with our in memory mapping stored by the mapperService - // we have to do it under the "cluster state update" thread to make sure that one doesn't modify it - // while we're checking - final BlockingQueue documentMappersToUpdate = ConcurrentCollections.newBlockingQueue(); - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference mappingCheckException = new AtomicReference<>(); - - // we use immediate as this is a very light weight check and we don't wait to delay recovery - clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new MappingUpdateTask(clusterService, indexService, recoverySettings, latch, documentMappersToUpdate, mappingCheckException, this.cancellableThreads)); - cancellableThreads.execute(new Interruptable() { - @Override - public void run() throws InterruptedException { - latch.await(); - } - }); - if (mappingCheckException.get() != null) { - logger.warn("error during mapping check, failing recovery", mappingCheckException.get()); - throw new ElasticsearchException("error during mapping check", mappingCheckException.get()); - } - if (documentMappersToUpdate.isEmpty()) { - return; - } - final CountDownLatch updatedOnMaster = new CountDownLatch(documentMappersToUpdate.size()); - MappingUpdatedAction.MappingUpdateListener listener = new MappingUpdatedAction.MappingUpdateListener() { - @Override - public void onMappingUpdate() { - updatedOnMaster.countDown(); - } - - @Override - public void onFailure(Throwable t) { - logger.debug("{} recovery to {}: failed to update mapping on master", request.shardId(), request.targetNode(), t); - updatedOnMaster.countDown(); - } - }; - for (DocumentMapper documentMapper : documentMappersToUpdate) { - mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), documentMapper.type(), documentMapper.mapping(), recoverySettings.internalActionTimeout(), listener); - } - cancellableThreads.execute(new Interruptable() { - @Override - public void run() throws InterruptedException { - try { - if (!updatedOnMaster.await(recoverySettings.internalActionTimeout().millis(), TimeUnit.MILLISECONDS)) { - logger.debug("[{}][{}] recovery [phase2] to {}: waiting on pending mapping update timed out. waited [{}]", - indexName, shardId, request.targetNode(), recoverySettings.internalActionTimeout()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.debug("interrupted while waiting for mapping to update on master"); - } - } - }); - } - /** * Send the given snapshot's operations to this handler's target node. *

@@ -723,71 +642,4 @@ public class RecoverySourceHandler { '}'; } - // this is a static class since we are holding an instance to the IndexShard - // on ShardRecoveryHandler which can not be GCed if the recovery is canceled - // but this task is still stuck in the queue. This can be problematic if the - // queue piles up and recoveries fail and can lead to OOM or memory pressure if lots of shards - // are created and removed. - private static class MappingUpdateTask extends TimeoutClusterStateUpdateTask { - private final CountDownLatch latch; - private final BlockingQueue documentMappersToUpdate; - private final AtomicReference mappingCheckException; - private final CancellableThreads cancellableThreads; - private ClusterService clusterService; - private IndexService indexService; - private RecoverySettings recoverySettings; - - public MappingUpdateTask(ClusterService clusterService, IndexService indexService, RecoverySettings recoverySettings, CountDownLatch latch, BlockingQueue documentMappersToUpdate, AtomicReference mappingCheckException, CancellableThreads cancellableThreads) { - this.latch = latch; - this.documentMappersToUpdate = documentMappersToUpdate; - this.mappingCheckException = mappingCheckException; - this.clusterService = clusterService; - this.indexService = indexService; - this.recoverySettings = recoverySettings; - this.cancellableThreads = cancellableThreads; - } - - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public TimeValue timeout() { - return recoverySettings.internalActionTimeout(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - if (cancellableThreads.isCancelled() == false) { // no need to run this if recovery is canceled - IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName()); - ImmutableOpenMap metaDataMappings = null; - if (indexMetaData != null) { - metaDataMappings = indexMetaData.getMappings(); - } - // default mapping should not be sent back, it can only be updated by put mapping API, and its - // a full in place replace, we don't want to override a potential update coming into it - for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) { - - MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type()); - if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) { - // not on master yet in the right form - documentMappersToUpdate.add(documentMapper); - } - } - } - return currentState; - } - - @Override - public void onFailure(String source, Throwable t) { - mappingCheckException.set(t); - latch.countDown(); - } - } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java index 187215ebfa9..9e80accf5e9 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java @@ -19,16 +19,9 @@ package org.elasticsearch.indices.recovery; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.action.index.MappingUpdatedAction; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; -import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -43,8 +36,8 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { private final StartRecoveryRequest request; private static final Translog.View EMPTY_VIEW = new EmptyView(); - public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, MappingUpdatedAction mappingUpdatedAction, ESLogger logger) { - super(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger); + public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ESLogger logger) { + super(shard, request, recoverySettings, transportService, logger); this.shard = shard; this.request = request; }