Make ingest pipeline resolution logic unit testable (#47026)

Extracted ingest pipeline resolution logic into a static method
and added unit tests for pipeline resolution logic.

Followup from #46847
This commit is contained in:
Martijn van Groningen 2019-09-25 10:35:36 +02:00
parent b234d2cbd7
commit eef1ba3fad
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
2 changed files with 266 additions and 106 deletions

View File

@ -54,7 +54,6 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -159,115 +158,13 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
boolean hasIndexRequestsWithPipelines = false;
final MetaData metaData = clusterService.state().getMetaData();
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
if (indexRequest.isPipelineResolved() == false) {
final String requestPipeline = indexRequest.getPipeline();
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
boolean requestCanOverridePipeline = true;
String requiredPipeline = null;
// start to look for default or required pipelines via settings found in the index meta data
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
// check the alias for the action request (this is how upserts are modeled)
if (indexMetaData == null && actionRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
if (indexMetaData != null) {
final Settings indexSettings = indexMetaData.getSettings();
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
// find the required pipeline if one is defined from an existing index
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(requiredPipeline);
requestCanOverridePipeline = false;
} else {
// find the default pipeline if one is defined from an existing index
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
}
} else if (indexRequest.index() != null) {
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
// order of templates are highest order first, we have to iterate through them all though
String defaultPipeline = null;
for (IndexTemplateMetaData template : templates) {
final Settings settings = template.settings();
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
requestCanOverridePipeline = false;
// we can not break in case a lower-order template has a default pipeline that we need to reject
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
// we can not break in case a lower-order template has a required pipeline that we need to reject
}
}
if (requiredPipeline != null && defaultPipeline != null) {
// we can not have picked up a required and a default pipeline from applying templates
final String message = String.format(
Locale.ROOT,
"required pipeline [%s] and default pipeline [%s] can not both be set",
requiredPipeline,
defaultPipeline);
throw new IllegalArgumentException(message);
}
final String pipeline;
if (requiredPipeline != null) {
pipeline = requiredPipeline;
} else {
pipeline = defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME;
}
indexRequest.setPipeline(pipeline);
}
if (requestPipeline != null) {
if (requestCanOverridePipeline == false) {
final String message = String.format(
Locale.ROOT,
"request pipeline [%s] can not override required pipeline [%s]",
requestPipeline,
requiredPipeline);
throw new IllegalArgumentException(message);
} else {
indexRequest.setPipeline(requestPipeline);
}
}
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
/*
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
* pipeline parameter too.
*/
indexRequest.isPipelineResolved(true);
} else if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
}
}
if (hasIndexRequestsWithPipelines) {
@ -363,6 +260,112 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
}
static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalRequest,
IndexRequest indexRequest,
MetaData metaData) {
if (indexRequest.isPipelineResolved() == false) {
final String requestPipeline = indexRequest.getPipeline();
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
boolean requestCanOverridePipeline = true;
String requiredPipeline = null;
// start to look for default or required pipelines via settings found in the index meta data
IndexMetaData indexMetaData = metaData.indices().get(originalRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
// check the alias for the action request (this is how upserts are modeled)
if (indexMetaData == null && originalRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(originalRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
if (indexMetaData != null) {
final Settings indexSettings = indexMetaData.getSettings();
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
// find the required pipeline if one is defined from an existing index
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(requiredPipeline);
requestCanOverridePipeline = false;
} else {
// find the default pipeline if one is defined from an existing index
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
}
} else if (indexRequest.index() != null) {
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
// order of templates are highest order first, we have to iterate through them all though
String defaultPipeline = null;
for (IndexTemplateMetaData template : templates) {
final Settings settings = template.settings();
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
requestCanOverridePipeline = false;
// we can not break in case a lower-order template has a default pipeline that we need to reject
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
// we can not break in case a lower-order template has a required pipeline that we need to reject
}
}
if (requiredPipeline != null && defaultPipeline != null) {
// we can not have picked up a required and a default pipeline from applying templates
final String message = String.format(
Locale.ROOT,
"required pipeline [%s] and default pipeline [%s] can not both be set",
requiredPipeline,
defaultPipeline);
throw new IllegalArgumentException(message);
}
final String pipeline;
if (requiredPipeline != null) {
pipeline = requiredPipeline;
} else {
pipeline = defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME;
}
indexRequest.setPipeline(pipeline);
}
if (requestPipeline != null) {
if (requestCanOverridePipeline == false) {
final String message = String.format(
Locale.ROOT,
"request pipeline [%s] can not override required pipeline [%s]",
requestPipeline,
requiredPipeline);
throw new IllegalArgumentException(message);
} else {
indexRequest.setPipeline(requestPipeline);
}
}
/*
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
* pipeline parameter too.
*/
indexRequest.isPipelineResolved(true);
}
// Return whether this index request has a pipeline
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false;
}
boolean needToCheck() {
return autoCreateIndex.needToCheck();
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver;
@ -27,11 +28,17 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
@ -44,6 +51,8 @@ import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class TransportBulkActionTests extends ESTestCase {
@ -153,4 +162,152 @@ public class TransportBulkActionTests extends ESTestCase {
UpdateRequest badUpsertRequest = new UpdateRequest("index", "type", "id1");
assertNull(TransportBulkAction.getIndexWriteRequest(badUpsertRequest));
}
public void testResolveRequiredOrDefaultPipelineDefaultPipeline() {
IndexMetaData.Builder builder = IndexMetaData.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0)
.putAlias(AliasMetaData.builder("alias").writeIndex(true).build());
MetaData metaData = MetaData.builder().put(builder).build();
// index name matches with IDM:
IndexRequest indexRequest = new IndexRequest("idx");
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("default-pipeline"));
// alias name matches with IDM:
indexRequest = new IndexRequest("alias");
result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("default-pipeline"));
// index name matches with ITMD:
IndexTemplateMetaData.Builder templateBuilder = IndexTemplateMetaData.builder("name1")
.patterns(Collections.singletonList("id*"))
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"));
metaData = MetaData.builder().put(templateBuilder).build();
indexRequest = new IndexRequest("idx");
result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("default-pipeline"));
}
public void testResolveRequiredOrDefaultPipelineRequiredPipeline() {
IndexMetaData.Builder builder = IndexMetaData.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0)
.putAlias(AliasMetaData.builder("alias").writeIndex(true).build());
MetaData metaData = MetaData.builder().put(builder).build();
// index name matches with IDM:
IndexRequest indexRequest = new IndexRequest("idx");
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("required-pipeline"));
// alias name matches with IDM:
indexRequest = new IndexRequest("alias");
result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("required-pipeline"));
// index name matches with ITMD:
IndexTemplateMetaData.Builder templateBuilder = IndexTemplateMetaData.builder("name1")
.patterns(Collections.singletonList("id*"))
.settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline"));
metaData = MetaData.builder().put(templateBuilder).build();
indexRequest = new IndexRequest("idx");
result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("required-pipeline"));
}
public void testResolveRequiredOrDefaultAndRequiredPipeline() {
IndexTemplateMetaData.Builder builder1 = IndexTemplateMetaData.builder("name1")
.patterns(Collections.singletonList("i*"))
.settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline"));
IndexTemplateMetaData.Builder builder2 = IndexTemplateMetaData.builder("name2")
.patterns(Collections.singletonList("id*"))
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"));
MetaData metaData = MetaData.builder().put(builder1).put(builder2).build();
IndexRequest indexRequest = new IndexRequest("idx");
Exception e = expectThrows(IllegalArgumentException.class,
() -> TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData));
assertThat(e.getMessage(),
equalTo("required pipeline [required-pipeline] and default pipeline [default-pipeline] can not both be set"));
}
public void testResolveRequiredOrDefaultPipelineRequestPipeline() {
// no pipeline:
{
MetaData metaData = MetaData.builder().build();
IndexRequest indexRequest = new IndexRequest("idx");
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
assertThat(result, is(false));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
}
// request pipeline:
{
MetaData metaData = MetaData.builder().build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline");
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("request-pipeline"));
}
// request pipeline with default pipeline:
{
IndexMetaData.Builder builder = IndexMetaData.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
MetaData metaData = MetaData.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline");
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("request-pipeline"));
}
// request pipeline with required pipeline:
{
IndexMetaData.Builder builder = IndexMetaData.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
MetaData metaData = MetaData.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline");
Exception e = expectThrows(IllegalArgumentException.class,
() -> TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData));
assertThat(e.getMessage(),
equalTo("request pipeline [request-pipeline] can not override required pipeline [required-pipeline]"));
}
// request pipeline set to required pipeline:
{
IndexMetaData.Builder builder = IndexMetaData.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
MetaData metaData = MetaData.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("required-pipeline").isPipelineResolved(true);
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("required-pipeline"));
}
}
}