updating mapping when using bulk API should be done post bulk processing, so if there is a bulk requests with many small updates to the same index/type doc structure, we only issue one

This commit is contained in:
Shay Banon 2012-02-19 15:55:01 +02:00
parent 94d64c0be4
commit 8b92b2b780
1 changed files with 21 additions and 7 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.bulk; package org.elasticsearch.action.bulk;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -37,6 +38,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
@ -54,6 +56,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.Set;
/** /**
* Performs the index operation. * Performs the index operation.
@ -128,6 +131,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
Engine.IndexingOperation[] ops = null; Engine.IndexingOperation[] ops = null;
Set<Tuple<String, String>> mappingsToUpdate = null;
BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
for (int i = 0; i < request.items().length; i++) { for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i]; BulkItemRequest item = request.items()[i];
@ -164,7 +169,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
// update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added // update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added
if (op.parsedDoc().mappersAdded()) { if (op.parsedDoc().mappersAdded()) {
updateMappingOnMaster(indexRequest); if (mappingsToUpdate == null) {
mappingsToUpdate = Sets.newHashSet();
}
mappingsToUpdate.add(Tuple.create(indexRequest.index(), indexRequest.type()));
} }
// if we are going to percolate, then we need to keep this op for the postPrimary operation // if we are going to percolate, then we need to keep this op for the postPrimary operation
@ -222,6 +230,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} }
} }
if (mappingsToUpdate != null) {
for (Tuple<String, String> mappingToUpdate : mappingsToUpdate) {
updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2());
}
}
if (request.refresh()) { if (request.refresh()) {
try { try {
indexShard.refresh(new Engine.Refresh(false)); indexShard.refresh(new Engine.Refresh(false));
@ -311,16 +325,16 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} }
} }
private void updateMappingOnMaster(final IndexRequest request) { private void updateMappingOnMaster(final String index, final String type) {
try { try {
MapperService mapperService = indicesService.indexServiceSafe(request.index()).mapperService(); MapperService mapperService = indicesService.indexServiceSafe(index).mapperService();
final DocumentMapper documentMapper = mapperService.documentMapper(request.type()); final DocumentMapper documentMapper = mapperService.documentMapper(type);
if (documentMapper == null) { // should not happen if (documentMapper == null) { // should not happen
return; return;
} }
documentMapper.refreshSource(); documentMapper.refreshSource();
mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(request.index(), request.type(), documentMapper.mappingSource()), new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() { mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(index, type, documentMapper.mappingSource()), new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override @Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) { public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
// all is well // all is well
@ -329,14 +343,14 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
@Override @Override
public void onFailure(Throwable e) { public void onFailure(Throwable e) {
try { try {
logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "] and source [" + documentMapper.mappingSource().string() + "]", e); logger.warn("failed to update master on updated mapping for index [{}], type [{}] and source [{}]", e, index, type, documentMapper.mappingSource().string());
} catch (IOException e1) { } catch (IOException e1) {
// ignore // ignore
} }
} }
}); });
} catch (Exception e) { } catch (Exception e) {
logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "]", e); logger.warn("failed to update master on updated mapping for index [{}], type [{}]", e, index, type);
} }
} }
} }