Merge pull request #11207 from jpountz/fix/recovery_no_mapping_update

Recovery: No need to send mappings to the master node on phase 2.
This commit is contained in:
Adrien Grand 2015-05-18 16:50:03 +02:00
commit a2a41c0115
3 changed files with 16 additions and 169 deletions

View File

@ -21,7 +21,6 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -40,7 +39,12 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportService;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
@ -55,7 +59,6 @@ public class RecoverySource extends AbstractComponent {
private final TransportService transportService;
private final IndicesService indicesService;
private final RecoverySettings recoverySettings;
private final MappingUpdatedAction mappingUpdatedAction;
private final ClusterService clusterService;
@ -64,11 +67,10 @@ public class RecoverySource extends AbstractComponent {
@Inject
public RecoverySource(Settings settings, TransportService transportService, IndicesService indicesService,
RecoverySettings recoverySettings, MappingUpdatedAction mappingUpdatedAction, ClusterService clusterService) {
RecoverySettings recoverySettings, ClusterService clusterService) {
super(settings);
this.transportService = transportService;
this.indicesService = indicesService;
this.mappingUpdatedAction = mappingUpdatedAction;
this.clusterService = clusterService;
this.indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
@Override
@ -116,9 +118,9 @@ public class RecoverySource extends AbstractComponent {
logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());
final RecoverySourceHandler handler;
if (IndexMetaData.isOnSharedFilesystem(shard.indexSettings())) {
handler = new SharedFSRecoverySourceHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
handler = new SharedFSRecoverySourceHandler(shard, request, recoverySettings, transportService, logger);
} else {
handler = new RecoverySourceHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
handler = new RecoverySourceHandler(shard, request, recoverySettings, transportService, logger);
}
ongoingRecoveries.add(shard, handler);
try {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
@ -30,31 +31,19 @@ import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.CancellableThreads.Interruptable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
@ -62,7 +51,6 @@ import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportRequestOptions;
@ -71,7 +59,9 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -91,9 +81,6 @@ public class RecoverySourceHandler {
private final StartRecoveryRequest request;
private final RecoverySettings recoverySettings;
private final TransportService transportService;
private final ClusterService clusterService;
private final IndexService indexService;
private final MappingUpdatedAction mappingUpdatedAction;
protected final RecoveryResponse response;
private final CancellableThreads cancellableThreads = new CancellableThreads() {
@ -114,18 +101,14 @@ public class RecoverySourceHandler {
public RecoverySourceHandler(final IndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings,
final TransportService transportService, final ClusterService clusterService,
final IndicesService indicesService, final MappingUpdatedAction mappingUpdatedAction, final ESLogger logger) {
final TransportService transportService, final ESLogger logger) {
this.shard = shard;
this.request = request;
this.recoverySettings = recoverySettings;
this.logger = logger;
this.transportService = transportService;
this.clusterService = clusterService;
this.indexName = this.request.shardId().index().name();
this.shardId = this.request.shardId().id();
this.indexService = indicesService.indexServiceSafe(indexName);
this.mappingUpdatedAction = mappingUpdatedAction;
this.response = new RecoveryResponse();
}
@ -490,9 +473,6 @@ public class RecoverySourceHandler {
cancellableThreads.checkForCancel();
StopWatch stopWatch = new StopWatch().start();
logger.trace("{} recovery [phase2] to {}: updating current mapping to master", request.shardId(), request.targetNode());
// Ensure that the mappings are synced with the master node
updateMappingOnMaster();
logger.trace("{} recovery [phase2] to {}: sending transaction log operations", request.shardId(), request.targetNode());
// Send all the snapshot's translog operations to the target
@ -546,67 +526,6 @@ public class RecoverySourceHandler {
indexName, shardId, request.targetNode(), stopWatch.totalTime());
}
/**
* Ensures that the mapping in the cluster state is the same as the mapping
* in our mapper service. If the mapping is not in sync, sends a request
* to update it in the cluster state and blocks until it has finished
* being updated.
*/
private void updateMappingOnMaster() {
// we test that the cluster state is in sync with our in memory mapping stored by the mapperService
// we have to do it under the "cluster state update" thread to make sure that one doesn't modify it
// while we're checking
final BlockingQueue<DocumentMapper> documentMappersToUpdate = ConcurrentCollections.newBlockingQueue();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> mappingCheckException = new AtomicReference<>();
// we use immediate as this is a very light weight check and we don't wait to delay recovery
clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new MappingUpdateTask(clusterService, indexService, recoverySettings, latch, documentMappersToUpdate, mappingCheckException, this.cancellableThreads));
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
latch.await();
}
});
if (mappingCheckException.get() != null) {
logger.warn("error during mapping check, failing recovery", mappingCheckException.get());
throw new ElasticsearchException("error during mapping check", mappingCheckException.get());
}
if (documentMappersToUpdate.isEmpty()) {
return;
}
final CountDownLatch updatedOnMaster = new CountDownLatch(documentMappersToUpdate.size());
MappingUpdatedAction.MappingUpdateListener listener = new MappingUpdatedAction.MappingUpdateListener() {
@Override
public void onMappingUpdate() {
updatedOnMaster.countDown();
}
@Override
public void onFailure(Throwable t) {
logger.debug("{} recovery to {}: failed to update mapping on master", request.shardId(), request.targetNode(), t);
updatedOnMaster.countDown();
}
};
for (DocumentMapper documentMapper : documentMappersToUpdate) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), documentMapper.type(), documentMapper.mapping(), recoverySettings.internalActionTimeout(), listener);
}
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
try {
if (!updatedOnMaster.await(recoverySettings.internalActionTimeout().millis(), TimeUnit.MILLISECONDS)) {
logger.debug("[{}][{}] recovery [phase2] to {}: waiting on pending mapping update timed out. waited [{}]",
indexName, shardId, request.targetNode(), recoverySettings.internalActionTimeout());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.debug("interrupted while waiting for mapping to update on master");
}
}
});
}
/**
* Send the given snapshot's operations to this handler's target node.
* <p/>
@ -723,71 +642,4 @@ public class RecoverySourceHandler {
'}';
}
// this is a static class since we are holding an instance to the IndexShard
// on ShardRecoveryHandler which can not be GCed if the recovery is canceled
// but this task is still stuck in the queue. This can be problematic if the
// queue piles up and recoveries fail and can lead to OOM or memory pressure if lots of shards
// are created and removed.
private static class MappingUpdateTask extends TimeoutClusterStateUpdateTask {
private final CountDownLatch latch;
private final BlockingQueue<DocumentMapper> documentMappersToUpdate;
private final AtomicReference<Throwable> mappingCheckException;
private final CancellableThreads cancellableThreads;
private ClusterService clusterService;
private IndexService indexService;
private RecoverySettings recoverySettings;
public MappingUpdateTask(ClusterService clusterService, IndexService indexService, RecoverySettings recoverySettings, CountDownLatch latch, BlockingQueue<DocumentMapper> documentMappersToUpdate, AtomicReference<Throwable> mappingCheckException, CancellableThreads cancellableThreads) {
this.latch = latch;
this.documentMappersToUpdate = documentMappersToUpdate;
this.mappingCheckException = mappingCheckException;
this.clusterService = clusterService;
this.indexService = indexService;
this.recoverySettings = recoverySettings;
this.cancellableThreads = cancellableThreads;
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public TimeValue timeout() {
return recoverySettings.internalActionTimeout();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (cancellableThreads.isCancelled() == false) { // no need to run this if recovery is canceled
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
}
// default mapping should not be sent back, it can only be updated by put mapping API, and its
// a full in place replace, we don't want to override a potential update coming into it
for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) {
MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
documentMappersToUpdate.add(documentMapper);
}
}
}
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
mappingCheckException.set(t);
latch.countDown();
}
}
}

View File

@ -19,16 +19,9 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -43,8 +36,8 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
private final StartRecoveryRequest request;
private static final Translog.View EMPTY_VIEW = new EmptyView();
public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, MappingUpdatedAction mappingUpdatedAction, ESLogger logger) {
super(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ESLogger logger) {
super(shard, request, recoverySettings, transportService, logger);
this.shard = shard;
this.request = request;
}