mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
bulk: Fields defined in the _default_
mapping of an index template should be picked up when an index alias filter is parsed if a new index is introduced when a document is indexed into an index that doesn't exist yet via the bulk api.
Closes #10609
This commit is contained in:
parent
15d58d91f1
commit
dd4a22bfed
@ -21,7 +21,6 @@ package org.elasticsearch.action.bulk;
|
|||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
@ -100,22 +99,33 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||||||
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
|
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
|
||||||
|
|
||||||
if (autoCreateIndex.needToCheck()) {
|
if (autoCreateIndex.needToCheck()) {
|
||||||
final Set<String> indices = Sets.newHashSet();
|
// Keep track of all unique indices and all unique types per index for the create index requests:
|
||||||
|
final Map<String, Set<String>> indicesAndTypes = new HashMap<>();
|
||||||
for (ActionRequest request : bulkRequest.requests) {
|
for (ActionRequest request : bulkRequest.requests) {
|
||||||
if (request instanceof DocumentRequest) {
|
if (request instanceof DocumentRequest) {
|
||||||
DocumentRequest req = (DocumentRequest) request;
|
DocumentRequest req = (DocumentRequest) request;
|
||||||
if (!indices.contains(req.index())) {
|
Set<String> types = indicesAndTypes.get(req.index());
|
||||||
indices.add(req.index());
|
if (types == null) {
|
||||||
|
indicesAndTypes.put(req.index(), types = new HashSet<>());
|
||||||
}
|
}
|
||||||
|
types.add(req.type());
|
||||||
} else {
|
} else {
|
||||||
throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName());
|
throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final AtomicInteger counter = new AtomicInteger(indices.size());
|
final AtomicInteger counter = new AtomicInteger(indicesAndTypes.size());
|
||||||
ClusterState state = clusterService.state();
|
ClusterState state = clusterService.state();
|
||||||
for (final String index : indices) {
|
for (Map.Entry<String, Set<String>> entry : indicesAndTypes.entrySet()) {
|
||||||
|
final String index = entry.getKey();
|
||||||
if (autoCreateIndex.shouldAutoCreate(index, state)) {
|
if (autoCreateIndex.shouldAutoCreate(index, state)) {
|
||||||
createIndexAction.execute(new CreateIndexRequest(bulkRequest).index(index).cause("auto(bulk api)").masterNodeTimeout(bulkRequest.timeout()), new ActionListener<CreateIndexResponse>() {
|
CreateIndexRequest createIndexRequest = new CreateIndexRequest(bulkRequest);
|
||||||
|
createIndexRequest.index(index);
|
||||||
|
for (String type : entry.getValue()) {
|
||||||
|
createIndexRequest.mapping(type);
|
||||||
|
}
|
||||||
|
createIndexRequest.cause("auto(bulk api)");
|
||||||
|
createIndexRequest.masterNodeTimeout(bulkRequest.timeout());
|
||||||
|
createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(CreateIndexResponse result) {
|
public void onResponse(CreateIndexResponse result) {
|
||||||
if (counter.decrementAndGet() == 0) {
|
if (counter.decrementAndGet() == 0) {
|
||||||
|
@ -29,6 +29,8 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
|
|||||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||||
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
||||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
|
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
|
||||||
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
@ -604,38 +606,66 @@ public class SimpleIndexTemplateTests extends ElasticsearchIntegrationTest {
|
|||||||
public void testStrictAliasParsingInIndicesCreatedViaTemplates() throws Exception {
|
public void testStrictAliasParsingInIndicesCreatedViaTemplates() throws Exception {
|
||||||
// Indexing into a should succeed, because the field mapping for field 'field' is defined in the test mapping.
|
// Indexing into a should succeed, because the field mapping for field 'field' is defined in the test mapping.
|
||||||
client().admin().indices().preparePutTemplate("template1")
|
client().admin().indices().preparePutTemplate("template1")
|
||||||
.setTemplate("a")
|
.setTemplate("a*")
|
||||||
.setOrder(0)
|
.setOrder(0)
|
||||||
.addMapping("test", "field", "type=string")
|
.addMapping("test", "field", "type=string")
|
||||||
.addAlias(new Alias("alias1").filter(termFilter("field", "value"))).get();
|
.addAlias(new Alias("alias1").filter(termFilter("field", "value"))).get();
|
||||||
// Indexing into b should succeed, because the field mapping for field 'field' is defined in the _default_ mapping and the test type exists.
|
// Indexing into b should succeed, because the field mapping for field 'field' is defined in the _default_ mapping and the test type exists.
|
||||||
client().admin().indices().preparePutTemplate("template2")
|
client().admin().indices().preparePutTemplate("template2")
|
||||||
.setTemplate("b")
|
.setTemplate("b*")
|
||||||
.setOrder(0)
|
.setOrder(0)
|
||||||
.addMapping("_default_", "field", "type=string")
|
.addMapping("_default_", "field", "type=string")
|
||||||
.addMapping("test")
|
.addMapping("test")
|
||||||
.addAlias(new Alias("alias2").filter(termFilter("field", "value"))).get();
|
.addAlias(new Alias("alias2").filter(termFilter("field", "value"))).get();
|
||||||
// Indexing into c should succeed, because the field mapping for field 'field' is defined in the _default_ mapping.
|
// Indexing into c should succeed, because the field mapping for field 'field' is defined in the _default_ mapping.
|
||||||
client().admin().indices().preparePutTemplate("template3")
|
client().admin().indices().preparePutTemplate("template3")
|
||||||
.setTemplate("c")
|
.setTemplate("c*")
|
||||||
.setOrder(0)
|
.setOrder(0)
|
||||||
.addMapping("_default_", "field", "type=string")
|
.addMapping("_default_", "field", "type=string")
|
||||||
.addAlias(new Alias("alias3").filter(termFilter("field", "value"))).get();
|
.addAlias(new Alias("alias3").filter(termFilter("field", "value"))).get();
|
||||||
// Indexing into d index should fail, since there is field with name 'field' in the mapping
|
// Indexing into d index should fail, since there is field with name 'field' in the mapping
|
||||||
client().admin().indices().preparePutTemplate("template4")
|
client().admin().indices().preparePutTemplate("template4")
|
||||||
.setTemplate("d")
|
.setTemplate("d*")
|
||||||
.setOrder(0)
|
.setOrder(0)
|
||||||
.addAlias(new Alias("alias4").filter(termFilter("field", "value"))).get();
|
.addAlias(new Alias("alias4").filter(termFilter("field", "value"))).get();
|
||||||
|
|
||||||
client().prepareIndex("a", "test", "test").setSource("{}").get();
|
client().prepareIndex("a1", "test", "test").setSource("{}").get();
|
||||||
client().prepareIndex("b", "test", "test").setSource("{}").get();
|
BulkResponse response = client().prepareBulk().add(new IndexRequest("a2", "test", "test").source("{}")).get();
|
||||||
client().prepareIndex("c", "test", "test").setSource("{}").get();
|
assertThat(response.hasFailures(), is(false));
|
||||||
|
assertThat(response.getItems()[0].isFailed(), equalTo(false));
|
||||||
|
assertThat(response.getItems()[0].getIndex(), equalTo("a2"));
|
||||||
|
assertThat(response.getItems()[0].getType(), equalTo("test"));
|
||||||
|
assertThat(response.getItems()[0].getId(), equalTo("test"));
|
||||||
|
assertThat(response.getItems()[0].getVersion(), equalTo(1l));
|
||||||
|
|
||||||
|
client().prepareIndex("b1", "test", "test").setSource("{}").get();
|
||||||
|
response = client().prepareBulk().add(new IndexRequest("b2", "test", "test").source("{}")).get();
|
||||||
|
assertThat(response.hasFailures(), is(false));
|
||||||
|
assertThat(response.getItems()[0].isFailed(), equalTo(false));
|
||||||
|
assertThat(response.getItems()[0].getIndex(), equalTo("b2"));
|
||||||
|
assertThat(response.getItems()[0].getType(), equalTo("test"));
|
||||||
|
assertThat(response.getItems()[0].getId(), equalTo("test"));
|
||||||
|
assertThat(response.getItems()[0].getVersion(), equalTo(1l));
|
||||||
|
|
||||||
|
client().prepareIndex("c1", "test", "test").setSource("{}").get();
|
||||||
|
response = client().prepareBulk().add(new IndexRequest("c2", "test", "test").source("{}")).get();
|
||||||
|
assertThat(response.hasFailures(), is(false));
|
||||||
|
assertThat(response.getItems()[0].isFailed(), equalTo(false));
|
||||||
|
assertThat(response.getItems()[0].getIndex(), equalTo("c2"));
|
||||||
|
assertThat(response.getItems()[0].getType(), equalTo("test"));
|
||||||
|
assertThat(response.getItems()[0].getId(), equalTo("test"));
|
||||||
|
assertThat(response.getItems()[0].getVersion(), equalTo(1l));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
client().prepareIndex("d", "test", "test").setSource("{}").get();
|
client().prepareIndex("d1", "test", "test").setSource("{}").get();
|
||||||
fail();
|
fail();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchIllegalArgumentException.class));
|
assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchIllegalArgumentException.class));
|
||||||
assertThat(e.getMessage(), containsString("failed to parse filter for alias [alias4]"));
|
assertThat(e.getMessage(), containsString("failed to parse filter for alias [alias4]"));
|
||||||
}
|
}
|
||||||
|
response = client().prepareBulk().add(new IndexRequest("d2", "test", "test").source("{}")).get();
|
||||||
|
assertThat(response.hasFailures(), is(true));
|
||||||
|
assertThat(response.getItems()[0].isFailed(), equalTo(true));
|
||||||
|
assertThat(response.getItems()[0].getFailureMessage(), containsString("failed to parse filter for alias [alias4]"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user