diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 61291c0284b..47b7c6142ba 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import com.google.common.collect.Sets; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -37,6 +38,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; @@ -54,6 +56,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Set; /** * Performs the index operation. @@ -128,6 +131,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation Engine.IndexingOperation[] ops = null; + Set> mappingsToUpdate = null; + BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; for (int i = 0; i < request.items().length; i++) { BulkItemRequest item = request.items()[i]; @@ -164,7 +169,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation // update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added if (op.parsedDoc().mappersAdded()) { - updateMappingOnMaster(indexRequest); + if (mappingsToUpdate == null) { + mappingsToUpdate = Sets.newHashSet(); + } + mappingsToUpdate.add(Tuple.create(indexRequest.index(), indexRequest.type())); } // if we are going to percolate, then we need to keep this op for the postPrimary operation @@ -222,6 +230,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } } + if (mappingsToUpdate != null) { + for (Tuple mappingToUpdate : mappingsToUpdate) { + updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2()); + } + } + if (request.refresh()) { try { indexShard.refresh(new Engine.Refresh(false)); @@ -311,16 +325,16 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } } - private void updateMappingOnMaster(final IndexRequest request) { + private void updateMappingOnMaster(final String index, final String type) { try { - MapperService mapperService = indicesService.indexServiceSafe(request.index()).mapperService(); - final DocumentMapper documentMapper = mapperService.documentMapper(request.type()); + MapperService mapperService = indicesService.indexServiceSafe(index).mapperService(); + final DocumentMapper documentMapper = mapperService.documentMapper(type); if (documentMapper == null) { // should not happen return; } documentMapper.refreshSource(); - mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(request.index(), request.type(), documentMapper.mappingSource()), new ActionListener() { + mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(index, type, documentMapper.mappingSource()), new ActionListener() { @Override public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) { // all is well @@ -329,14 +343,14 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation @Override public void onFailure(Throwable e) { try { - logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "] and source [" + documentMapper.mappingSource().string() + "]", e); + logger.warn("failed to update master on updated mapping for index [{}], type [{}] and source [{}]", e, index, type, documentMapper.mappingSource().string()); } catch (IOException e1) { // ignore } } }); } catch (Exception e) { - logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "]", e); + logger.warn("failed to update master on updated mapping for index [{}], type [{}]", e, index, type); } } }