[Recovery] only send mapping updates to master if needed

The change added in #6762 helps making sure the pending mapping updates are processed on all nodes to prevent moving shards to nodes which are not yet fully aware of the new mapping. However it introduced a racing condition delete_mapping operations, potentially causing a type to be added after it's deletion. This commit solves this by only sending a mapping update if the mapping source has actually changed.

Closes #6772
This commit is contained in:
Boaz Leskes 2014-07-07 21:11:59 +02:00
parent 34893c0570
commit 867d88795b
1 changed files with 14 additions and 1 deletions

View File

@ -27,10 +27,13 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -280,6 +283,11 @@ public class RecoverySource extends AbstractComponent {
} }
private void updateMappingOnMaster() { private void updateMappingOnMaster() {
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
}
List<DocumentMapper> documentMappersToUpdate = Lists.newArrayList(); List<DocumentMapper> documentMappersToUpdate = Lists.newArrayList();
for (DocumentMapper documentMapper : indexService.mapperService()) { for (DocumentMapper documentMapper : indexService.mapperService()) {
// default mapping should not be sent back, it can only be updated by put mapping API, and its // default mapping should not be sent back, it can only be updated by put mapping API, and its
@ -287,8 +295,13 @@ public class RecoverySource extends AbstractComponent {
if (documentMapper.type().equals(MapperService.DEFAULT_MAPPING)) { if (documentMapper.type().equals(MapperService.DEFAULT_MAPPING)) {
continue; continue;
} }
MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
documentMappersToUpdate.add(documentMapper); documentMappersToUpdate.add(documentMapper);
} }
}
if (documentMappersToUpdate.isEmpty()) { if (documentMappersToUpdate.isEmpty()) {
return; return;
} }