diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 319a26afc71..bff9e24d0e9 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk; import com.google.common.collect.Sets; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.ElasticsearchWrapperException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -44,6 +45,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -142,7 +144,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation final BulkShardRequest request = shardRequest.request; IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); Engine.IndexingOperation[] ops = null; - Set> mappingsToUpdate = null; + final Set> mappingsToUpdate = Sets.newHashSet(); BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; long[] preVersions = new long[request.items().length]; @@ -154,21 +156,26 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation preVersions[requestIndex] = indexRequest.version(); preVersionTypes[requestIndex] = indexRequest.versionType(); try { - WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true); - // add the response - IndexResponse indexResponse = result.response(); - responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse); - if (result.mappingToUpdate != null) { - if (mappingsToUpdate == null) { - mappingsToUpdate = Sets.newHashSet(); + try { + WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true); + // add the response + IndexResponse indexResponse = result.response(); + responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse); + if (result.mappingToUpdate != null) { + mappingsToUpdate.add(result.mappingToUpdate); } - mappingsToUpdate.add(result.mappingToUpdate); - } - if (result.op != null) { - if (ops == null) { - ops = new Engine.IndexingOperation[request.items().length]; + if (result.op != null) { + if (ops == null) { + ops = new Engine.IndexingOperation[request.items().length]; + } + ops[requestIndex] = result.op; } - ops[requestIndex] = result.op; + } catch (WriteFailure e){ + Tuple mappingsToUpdateOnFailure = e.mappingsToUpdate; + if (mappingsToUpdateOnFailure != null) { + mappingsToUpdate.add(mappingsToUpdateOnFailure); + } + throw e.getCause(); } } catch (Throwable e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it @@ -177,6 +184,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation for (int j = 0; j < requestIndex; j++) { applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]); } + for (Tuple mappingToUpdate : mappingsToUpdate) { + updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2()); + } throw (ElasticsearchException) e; } if (e instanceof ElasticsearchException && ((ElasticsearchException) e).status() == RestStatus.CONFLICT) { @@ -246,9 +256,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResponse); if (result.mappingToUpdate != null) { - if (mappingsToUpdate == null) { - mappingsToUpdate = Sets.newHashSet(); - } mappingsToUpdate.add(result.mappingToUpdate); } if (result.op != null) { @@ -284,8 +291,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation // we can't try any more responses[requestIndex] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), t)); - ; - request.items()[requestIndex] = null; // do not send to replicas } } else { @@ -339,10 +344,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } - if (mappingsToUpdate != null) { - for (Tuple mappingToUpdate : mappingsToUpdate) { - updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2()); - } + for (Tuple mappingToUpdate : mappingsToUpdate) { + updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2()); } if (request.refresh()) { @@ -375,6 +378,17 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } + static class WriteFailure extends ElasticsearchException implements ElasticsearchWrapperException { + @Nullable + final Tuple mappingsToUpdate; + + WriteFailure(Throwable cause, Tuple mappingsToUpdate) { + super(null, cause); + assert cause != null; + this.mappingsToUpdate = mappingsToUpdate; + } + } + private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState, IndexShard indexShard, boolean processed) { @@ -393,33 +407,41 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()) .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); + // update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added + Tuple mappingsToUpdate = null; + long version; boolean created; Engine.IndexingOperation op; - if (indexRequest.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); - indexShard.index(index); - version = index.version(); - op = index; - created = index.created(); - } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); - indexShard.create(create); - version = create.version(); - op = create; - created = true; + try { + if (indexRequest.opType() == IndexRequest.OpType.INDEX) { + Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); + if (index.parsedDoc().mappingsModified()) { + mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); + } + indexShard.index(index); + version = index.version(); + op = index; + created = index.created(); + } else { + Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); + if (create.parsedDoc().mappingsModified()) { + mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); + } + indexShard.create(create); + version = create.version(); + op = create; + created = true; + } + // update the version on request so it will happen on the replicas + indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); + indexRequest.version(version); + } catch (Throwable t) { + throw new WriteFailure(t, mappingsToUpdate); } - // update the version on request so it will happen on the replicas - indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); - indexRequest.version(version); assert indexRequest.versionType().validateVersion(indexRequest.version()); - // update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added - Tuple mappingsToUpdate = null; - if (op.parsedDoc().mappingsModified()) { - mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); - } IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version, created); return new WriteResult(indexResponse, mappingsToUpdate, op); diff --git a/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationTests.java b/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationTests.java new file mode 100644 index 00000000000..5a3c3188fe2 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationTests.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.elasticsearch.action.bulk; + +import com.google.common.base.Charsets; +import com.google.common.base.Predicate; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import static org.hamcrest.Matchers.*; +import static org.elasticsearch.common.io.Streams.copyToStringFromClasspath; + +import org.junit.Test; + +@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.SUITE, numNodes=1) +public class BulkIntegrationTests extends ElasticsearchIntegrationTest{ + + @Test + public void testBulkIndexCreatesMapping() throws Exception { + String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/bulk-log.json"); + BulkRequestBuilder bulkBuilder = new BulkRequestBuilder(client()); + bulkBuilder.add(bulkAction.getBytes(Charsets.UTF_8), 0, bulkAction.length(), true, null, null); + bulkBuilder.execute().actionGet(); + awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + try { + GetMappingsResponse mappingsResponse = client().admin().indices().getMappings(new GetMappingsRequest()).get(); + return mappingsResponse.getMappings().containsKey("logstash-2014.03.30"); + } catch (Throwable t) { + return false; + } + } + }); + awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + try { + GetMappingsResponse mappingsResponse = client().admin().indices().getMappings(new GetMappingsRequest()).get(); + return mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs"); + } catch (Throwable t) { + return false; + } + } + }); + ensureYellow(); + GetMappingsResponse mappingsResponse = client().admin().indices().getMappings(new GetMappingsRequest()).get(); + assertThat(mappingsResponse.mappings().size(), equalTo(1)); + assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30")); + assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs")); + } +} diff --git a/src/test/java/org/elasticsearch/action/bulk/bulk-log.json b/src/test/java/org/elasticsearch/action/bulk/bulk-log.json new file mode 100644 index 00000000000..9c3663c3f63 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/bulk/bulk-log.json @@ -0,0 +1,24 @@ +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug2/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug2/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug2/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{"index":{"_index":"logstash-2014.03.30","_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug2/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"}