Recovery: mapping check during phase2 should be done in cluster state update task

Before phase2 we check verify that the local mapping is in sync with the cluster state mapping (and send & wait on a master update mapping task if not). This check should be done under a cluster state update task to make sure an incoming cluster state update to do not change things while we check.

Closes #7744
This commit is contained in:
Boaz Leskes 2014-09-16 13:08:07 +02:00
parent d17fd26f23
commit 4677d05048
1 changed files with 49 additions and 22 deletions

View File

@ -29,11 +29,14 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -43,6 +46,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
@ -60,10 +64,7 @@ import org.elasticsearch.transport.*;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -307,43 +308,69 @@ public class RecoverySource extends AbstractComponent {
}
private void updateMappingOnMaster() {
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
}
List<DocumentMapper> documentMappersToUpdate = Lists.newArrayList();
// default mapping should not be sent back, it can only be updated by put mapping API, and its
// a full in place replace, we don't want to override a potential update coming it
for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) {
MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
documentMappersToUpdate.add(documentMapper);
// we test that the cluster state is in sync with our in memory mapping stored by the mapperService
// we have to do it under the "cluster state update" thread to make sure that one doesn't modify it
// while we're checking
final BlockingQueue<DocumentMapper> documentMappersToUpdate = ConcurrentCollections.newBlockingQueue();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("recovery_mapping_check", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
}
// default mapping should not be sent back, it can only be updated by put mapping API, and its
// a full in place replace, we don't want to override a potential update coming it
for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) {
MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
documentMappersToUpdate.add(documentMapper);
}
}
return currentState;
}
@Override
public void onFailure(String source, @Nullable Throwable t) {
logger.error("unexpected error while checking for pending mapping changes", t);
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (documentMappersToUpdate.isEmpty()) {
return;
}
final CountDownLatch countDownLatch = new CountDownLatch(documentMappersToUpdate.size());
final CountDownLatch updatedOnMaster = new CountDownLatch(documentMappersToUpdate.size());
MappingUpdatedAction.MappingUpdateListener listener = new MappingUpdatedAction.MappingUpdateListener() {
@Override
public void onMappingUpdate() {
countDownLatch.countDown();
updatedOnMaster.countDown();
}
@Override
public void onFailure(Throwable t) {
logger.debug("{} recovery to {}: failed to update mapping on master", request.shardId(), request.targetNode(), t);
countDownLatch.countDown();
updatedOnMaster.countDown();
}
};
for (DocumentMapper documentMapper : documentMappersToUpdate) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), documentMapper, indexService.indexUUID(), listener);
}
try {
if (!countDownLatch.await(internalActionTimeout.millis(), TimeUnit.MILLISECONDS)) {
if (!updatedOnMaster.await(internalActionTimeout.millis(), TimeUnit.MILLISECONDS)) {
logger.debug("{} recovery [phase2] to {}: waiting on pending mapping update timed out. waited [{}]", request.shardId(), request.targetNode(), internalActionTimeout);
}
} catch (InterruptedException e) {