diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 4cdc4887060..8d3ad3de9cd 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -21,7 +21,6 @@ package org.elasticsearch.action.bulk; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; @@ -100,22 +99,33 @@ public class TransportBulkAction extends HandledTransportAction responses = new AtomicArray<>(bulkRequest.requests.size()); if (autoCreateIndex.needToCheck()) { - final Set indices = Sets.newHashSet(); + // Keep track of all unique indices and all unique types per index for the create index requests: + final Map> indicesAndTypes = new HashMap<>(); for (ActionRequest request : bulkRequest.requests) { if (request instanceof DocumentRequest) { DocumentRequest req = (DocumentRequest) request; - if (!indices.contains(req.index())) { - indices.add(req.index()); + Set types = indicesAndTypes.get(req.index()); + if (types == null) { + indicesAndTypes.put(req.index(), types = new HashSet<>()); } + types.add(req.type()); } else { throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName()); } } - final AtomicInteger counter = new AtomicInteger(indices.size()); + final AtomicInteger counter = new AtomicInteger(indicesAndTypes.size()); ClusterState state = clusterService.state(); - for (final String index : indices) { + for (Map.Entry> entry : indicesAndTypes.entrySet()) { + final String index = entry.getKey(); if (autoCreateIndex.shouldAutoCreate(index, state)) { - createIndexAction.execute(new CreateIndexRequest(bulkRequest).index(index).cause("auto(bulk api)").masterNodeTimeout(bulkRequest.timeout()), new ActionListener() { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(bulkRequest); + createIndexRequest.index(index); + for (String type : entry.getValue()) { + createIndexRequest.mapping(type); + } + createIndexRequest.cause("auto(bulk api)"); + createIndexRequest.masterNodeTimeout(bulkRequest.timeout()); + createIndexAction.execute(createIndexRequest, new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { if (counter.decrementAndGet() == 0) { diff --git a/src/test/java/org/elasticsearch/indices/template/SimpleIndexTemplateTests.java b/src/test/java/org/elasticsearch/indices/template/SimpleIndexTemplateTests.java index e02c2bef8b4..1c3f8f8c9ca 100644 --- a/src/test/java/org/elasticsearch/indices/template/SimpleIndexTemplateTests.java +++ b/src/test/java/org/elasticsearch/indices/template/SimpleIndexTemplateTests.java @@ -29,6 +29,8 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.common.Priority; @@ -604,38 +606,66 @@ public class SimpleIndexTemplateTests extends ElasticsearchIntegrationTest { public void testStrictAliasParsingInIndicesCreatedViaTemplates() throws Exception { // Indexing into a should succeed, because the field mapping for field 'field' is defined in the test mapping. client().admin().indices().preparePutTemplate("template1") - .setTemplate("a") + .setTemplate("a*") .setOrder(0) .addMapping("test", "field", "type=string") .addAlias(new Alias("alias1").filter(termFilter("field", "value"))).get(); // Indexing into b should succeed, because the field mapping for field 'field' is defined in the _default_ mapping and the test type exists. client().admin().indices().preparePutTemplate("template2") - .setTemplate("b") + .setTemplate("b*") .setOrder(0) .addMapping("_default_", "field", "type=string") .addMapping("test") .addAlias(new Alias("alias2").filter(termFilter("field", "value"))).get(); // Indexing into c should succeed, because the field mapping for field 'field' is defined in the _default_ mapping. client().admin().indices().preparePutTemplate("template3") - .setTemplate("c") + .setTemplate("c*") .setOrder(0) .addMapping("_default_", "field", "type=string") .addAlias(new Alias("alias3").filter(termFilter("field", "value"))).get(); // Indexing into d index should fail, since there is field with name 'field' in the mapping client().admin().indices().preparePutTemplate("template4") - .setTemplate("d") + .setTemplate("d*") .setOrder(0) .addAlias(new Alias("alias4").filter(termFilter("field", "value"))).get(); - client().prepareIndex("a", "test", "test").setSource("{}").get(); - client().prepareIndex("b", "test", "test").setSource("{}").get(); - client().prepareIndex("c", "test", "test").setSource("{}").get(); + client().prepareIndex("a1", "test", "test").setSource("{}").get(); + BulkResponse response = client().prepareBulk().add(new IndexRequest("a2", "test", "test").source("{}")).get(); + assertThat(response.hasFailures(), is(false)); + assertThat(response.getItems()[0].isFailed(), equalTo(false)); + assertThat(response.getItems()[0].getIndex(), equalTo("a2")); + assertThat(response.getItems()[0].getType(), equalTo("test")); + assertThat(response.getItems()[0].getId(), equalTo("test")); + assertThat(response.getItems()[0].getVersion(), equalTo(1l)); + + client().prepareIndex("b1", "test", "test").setSource("{}").get(); + response = client().prepareBulk().add(new IndexRequest("b2", "test", "test").source("{}")).get(); + assertThat(response.hasFailures(), is(false)); + assertThat(response.getItems()[0].isFailed(), equalTo(false)); + assertThat(response.getItems()[0].getIndex(), equalTo("b2")); + assertThat(response.getItems()[0].getType(), equalTo("test")); + assertThat(response.getItems()[0].getId(), equalTo("test")); + assertThat(response.getItems()[0].getVersion(), equalTo(1l)); + + client().prepareIndex("c1", "test", "test").setSource("{}").get(); + response = client().prepareBulk().add(new IndexRequest("c2", "test", "test").source("{}")).get(); + assertThat(response.hasFailures(), is(false)); + assertThat(response.getItems()[0].isFailed(), equalTo(false)); + assertThat(response.getItems()[0].getIndex(), equalTo("c2")); + assertThat(response.getItems()[0].getType(), equalTo("test")); + assertThat(response.getItems()[0].getId(), equalTo("test")); + assertThat(response.getItems()[0].getVersion(), equalTo(1l)); + try { - client().prepareIndex("d", "test", "test").setSource("{}").get(); + client().prepareIndex("d1", "test", "test").setSource("{}").get(); fail(); } catch (Exception e) { assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchIllegalArgumentException.class)); assertThat(e.getMessage(), containsString("failed to parse filter for alias [alias4]")); } + response = client().prepareBulk().add(new IndexRequest("d2", "test", "test").source("{}")).get(); + assertThat(response.hasFailures(), is(true)); + assertThat(response.getItems()[0].isFailed(), equalTo(true)); + assertThat(response.getItems()[0].getFailureMessage(), containsString("failed to parse filter for alias [alias4]")); } }