From bd776261773ed2ba640c8f35f3803e2b946c62bc Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 19 Sep 2019 16:20:12 -0400 Subject: [PATCH] Add the ability to require an ingest pipeline (#46847) This commit adds the ability to require an ingest pipeline on an index. Today we can have a default pipeline, but that could be overridden by a request pipeline parameter. This commit introduces a new index setting index.required_pipeline that acts similarly to index.default_pipeline, except that it can not be overridden by a request pipeline parameter. Additionally, a default pipeline and a request pipeline can not both be set. The required pipeline can be set to _none to ensure that no pipeline ever runs for index requests on that index. --- docs/reference/index-modules.asciidoc | 9 +- .../test/ingest/240_required_pipeline.yml | 175 ++++++++++++++++++ .../action/bulk/TransportBulkAction.java | 101 ++++++++-- .../action/index/IndexRequest.java | 36 +++- .../common/settings/IndexScopedSettings.java | 1 + .../elasticsearch/index/IndexSettings.java | 85 ++++++++- ...ActionIndicesThatCannotBeCreatedTests.java | 4 + .../index/RequiredPipelineIT.java | 133 +++++++++++++ .../action/TransportResumeFollowAction.java | 1 + 9 files changed, 513 insertions(+), 32 deletions(-) create mode 100644 modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml create mode 100644 server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index 93da06d9b96..9ae1dac826e 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -234,13 +234,20 @@ specific index module: The length of time that a <> remains available for <>. Defaults to `60s`. - `index.default_pipeline`:: + `index.default_pipeline`:: The default <> pipeline for this index. Index requests will fail if the default pipeline is set and the pipeline does not exist. The default may be overridden using the `pipeline` parameter. The special pipeline name `_none` indicates no ingest pipeline should be run. + `index.required_pipeline`:: + The required <> pipeline for this index. Index requests + will fail if the required pipeline is set and the pipeline does not exist. + The required pipeline can not be overridden with the `pipeline` parameter. A + default pipeline and a required pipeline can not both be set. The special + pipeline name `_none` indicates no ingest pipeline will run. + [float] === Settings in other index modules diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml new file mode 100644 index 00000000000..01553bcf40a --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml @@ -0,0 +1,175 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test index with required pipeline": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "bytes" : { + "field" : "bytes_source_field", + "target_field" : "bytes_target_field" + } + } + ] + } + - match: { acknowledged: true } + # required pipeline via index + - do: + indices.create: + index: test + body: + settings: + index: + required_pipeline: "my_pipeline" + aliases: + test_alias: {} + + - do: + index: + index: test + id: 1 + body: {bytes_source_field: "1kb"} + + - do: + get: + index: test + id: 1 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + # required pipeline via alias + - do: + index: + index: test_alias + id: 2 + body: {bytes_source_field: "1kb"} + + - do: + get: + index: test + id: 2 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + # required pipeline via upsert + - do: + update: + index: test + id: 3 + body: + script: + source: "ctx._source.ran_script = true" + lang: "painless" + upsert: { "bytes_source_field":"1kb" } + - do: + get: + index: test + id: 3 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + # required pipeline via scripted upsert + - do: + update: + index: test + id: 4 + body: + script: + source: "ctx._source.bytes_source_field = '1kb'" + lang: "painless" + upsert : {} + scripted_upsert: true + - do: + get: + index: test + id: 4 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + # required pipeline via doc_as_upsert + - do: + update: + index: test + id: 5 + body: + doc: { "bytes_source_field":"1kb" } + doc_as_upsert: true + - do: + get: + index: test + id: 5 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + # required pipeline via bulk upsert + # note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline + # needs to be in the upsert, not the script + - do: + bulk: + refresh: true + body: | + {"update":{"_id":"6","_index":"test"}} + {"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}} + {"update":{"_id":"7","_index":"test"}} + {"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true} + {"update":{"_id":"8","_index":"test"}} + {"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true} + {"update":{"_id":"6_alias","_index":"test_alias"}} + {"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}} + {"update":{"_id":"7_alias","_index":"test_alias"}} + {"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true} + {"update":{"_id":"8_alias","_index":"test_alias"}} + {"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true} + + - do: + mget: + body: + docs: + - { _index: "test", _id: "6" } + - { _index: "test", _id: "7" } + - { _index: "test", _id: "8" } + - { _index: "test", _id: "6_alias" } + - { _index: "test", _id: "7_alias" } + - { _index: "test", _id: "8_alias" } + - match: { docs.0._index: "test" } + - match: { docs.0._id: "6" } + - match: { docs.0._source.bytes_source_field: "1kb" } + - match: { docs.0._source.bytes_target_field: 1024 } + - is_false: docs.0._source.ran_script + - match: { docs.1._index: "test" } + - match: { docs.1._id: "7" } + - match: { docs.1._source.bytes_source_field: "2kb" } + - match: { docs.1._source.bytes_target_field: 2048 } + - match: { docs.2._index: "test" } + - match: { docs.2._id: "8" } + - match: { docs.2._source.bytes_source_field: "3kb" } + - match: { docs.2._source.bytes_target_field: 3072 } + - match: { docs.2._source.ran_script: true } + - match: { docs.3._index: "test" } + - match: { docs.3._id: "6_alias" } + - match: { docs.3._source.bytes_source_field: "1kb" } + - match: { docs.3._source.bytes_target_field: 1024 } + - is_false: docs.3._source.ran_script + - match: { docs.4._index: "test" } + - match: { docs.4._id: "7_alias" } + - match: { docs.4._source.bytes_source_field: "2kb" } + - match: { docs.4._source.bytes_target_field: 2048 } + - match: { docs.5._index: "test" } + - match: { docs.5._id: "8_alias" } + - match: { docs.5._source.bytes_source_field: "3kb" } + - match: { docs.5._source.bytes_target_field: 3072 } + - match: { docs.5._source.ran_script: true } + + # bad request, request pipeline can not be specified + - do: + catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/ + index: + index: test + id: 9 + pipeline: "pipeline" + body: {bytes_source_field: "1kb"} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index fc4dee1c1b2..b70780546d6 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.bulk; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SparseFixedBitSet; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceAlreadyExistsException; @@ -76,6 +77,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -160,11 +162,14 @@ public class TransportBulkAction extends HandledTransportAction indicesMetaData = metaData.indices(); for (DocWriteRequest actionRequest : bulkRequest.requests) { IndexRequest indexRequest = getIndexWriteRequest(actionRequest); + if (indexRequest != null) { - // get pipeline from request - String pipeline = indexRequest.getPipeline(); - if (pipeline == null) { - // start to look for default pipeline via settings found in the index meta data + if (indexRequest.isPipelineResolved() == false) { + final String requestPipeline = indexRequest.getPipeline(); + indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); + boolean requestCanOverridePipeline = true; + String requiredPipeline = null; + // start to look for default or required pipelines via settings found in the index meta data IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); // check the alias for the index request (this is how normal index requests are modeled) if (indexMetaData == null && indexRequest.index() != null) { @@ -183,34 +188,86 @@ public class TransportBulkAction extends HandledTransportAction templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); assert (templates != null); - String defaultPipeline = IngestService.NOOP_PIPELINE_NAME; - // order of templates are highest order first, break if we find a default_pipeline + // order of templates are highest order first, we have to iterate through them all though + String defaultPipeline = null; for (IndexTemplateMetaData template : templates) { final Settings settings = template.settings(); - if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) { + if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) { + requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings); + requestCanOverridePipeline = false; + // we can not break in case a lower-order template has a default pipeline that we need to reject + } else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) { defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); - break; + // we can not break in case a lower-order template has a required pipeline that we need to reject } } - indexRequest.setPipeline(defaultPipeline); - if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { - hasIndexRequestsWithPipelines = true; + if (requiredPipeline != null && defaultPipeline != null) { + // we can not have picked up a required and a default pipeline from applying templates + final String message = String.format( + Locale.ROOT, + "required pipeline [%s] and default pipeline [%s] can not both be set", + requiredPipeline, + defaultPipeline); + throw new IllegalArgumentException(message); + } + final String pipeline; + if (requiredPipeline != null) { + pipeline = requiredPipeline; + } else { + pipeline = defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME; + } + indexRequest.setPipeline(pipeline); + } + + if (requestPipeline != null) { + if (requestCanOverridePipeline == false) { + final String message = String.format( + Locale.ROOT, + "request pipeline [%s] can not override required pipeline [%s]", + requestPipeline, + requiredPipeline); + throw new IllegalArgumentException(message); + } else { + indexRequest.setPipeline(requestPipeline); } } - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { + + if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) { + hasIndexRequestsWithPipelines = true; + } + /* + * We have to track whether or not the pipeline for this request has already been resolved. It can happen that the + * pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request + * has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have + * already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we + * can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been + * set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish + * these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request + * pipeline parameter too. + */ + indexRequest.isPipelineResolved(true); + } else if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) { hasIndexRequestsWithPipelines = true; } } + } if (hasIndexRequestsWithPipelines) { @@ -221,6 +278,14 @@ public class TransportBulkAction extends HandledTransportAction implement private String pipeline; + private boolean isPipelineResolved; + /** * Value for {@link #getAutoGeneratedTimestamp()} if the document has an external * provided ID. @@ -131,6 +133,9 @@ public class IndexRequest extends ReplicatedWriteRequest implement version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); pipeline = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_7_5_0)) { + isPipelineResolved = in.readBoolean(); + } isRetry = in.readBoolean(); autoGeneratedTimestamp = in.readLong(); if (in.readBoolean()) { @@ -261,7 +266,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement @Override public String type() { if (type == null) { - return MapperService.SINGLE_MAPPING_NAME; + return MapperService.SINGLE_MAPPING_NAME; } return type; } @@ -290,7 +295,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement type = defaultType; } return this; - } + } /** * The id of the indexed document. If not set, will be automatically generated. */ @@ -345,6 +350,26 @@ public class IndexRequest extends ReplicatedWriteRequest implement return this.pipeline; } + /** + * Sets if the pipeline for this request has been resolved by the coordinating node. + * + * @param isPipelineResolved true if the pipeline has been resolved + * @return the request + */ + public IndexRequest isPipelineResolved(final boolean isPipelineResolved) { + this.isPipelineResolved = isPipelineResolved; + return this; + } + + /** + * Returns whether or not the pipeline for this request has been resolved by the coordinating node. + * + * @return true if the pipeline has been resolved + */ + public boolean isPipelineResolved() { + return this.isPipelineResolved; + } + /** * The source of the document to index, recopied to a new array if it is unsafe. */ @@ -633,8 +658,8 @@ public class IndexRequest extends ReplicatedWriteRequest implement @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - // A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions. - // So we use the type accessor method here to make the type non-null (will default it to "_doc"). + // A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions. + // So we use the type accessor method here to make the type non-null (will default it to "_doc"). out.writeOptionalString(type()); out.writeOptionalString(id); out.writeOptionalString(routing); @@ -653,6 +678,9 @@ public class IndexRequest extends ReplicatedWriteRequest implement out.writeLong(version); out.writeByte(versionType.getValue()); out.writeOptionalString(pipeline); + if (out.getVersion().onOrAfter(Version.V_7_5_0)) { + out.writeBoolean(isPipelineResolved); + } out.writeBoolean(isRetry); out.writeLong(autoGeneratedTimestamp); if (contentType != null) { diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 4dc99c5f9ea..6a95eac713a 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -166,6 +166,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS, IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS, IndexSettings.DEFAULT_PIPELINE, + IndexSettings.REQUIRED_PIPELINE, MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING, // validate that built-in similarities don't get redefined diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 19e99a65eb5..ab7a4fc9e64 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -35,8 +35,10 @@ import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.Node; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -299,12 +301,67 @@ public final class IndexSettings { 1000, 1, Property.Dynamic, Property.IndexScope); public static final Setting DEFAULT_PIPELINE = - new Setting<>("index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, s -> { - if (s == null || s.isEmpty()) { - throw new IllegalArgumentException("Value for [index.default_pipeline] must be a non-empty string."); - } - return s; - }, Property.Dynamic, Property.IndexScope); + new Setting<>("index.default_pipeline", + IngestService.NOOP_PIPELINE_NAME, + Function.identity(), + new DefaultPipelineValidator(), + Property.Dynamic, + Property.IndexScope); + + public static final Setting REQUIRED_PIPELINE = + new Setting<>("index.required_pipeline", + IngestService.NOOP_PIPELINE_NAME, + Function.identity(), + new RequiredPipelineValidator(), + Property.Dynamic, + Property.IndexScope); + + static class DefaultPipelineValidator implements Setting.Validator { + + @Override + public void validate(final String value) { + + } + + @Override + public void validate(final String value, final Map, String> settings) { + final String requiredPipeline = settings.get(IndexSettings.REQUIRED_PIPELINE); + if (value.equals(IngestService.NOOP_PIPELINE_NAME) == false + && requiredPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) { + throw new IllegalArgumentException( + "index has a default pipeline [" + value + "] and a required pipeline [" + requiredPipeline + "]"); + } + } + + @Override + public Iterator> settings() { + return Collections.singletonList(REQUIRED_PIPELINE).iterator(); + } + + } + + static class RequiredPipelineValidator implements Setting.Validator { + + @Override + public void validate(final String value) { + + } + + @Override + public void validate(final String value, final Map, String> settings) { + final String defaultPipeline = settings.get(IndexSettings.DEFAULT_PIPELINE); + if (value.equals(IngestService.NOOP_PIPELINE_NAME) && defaultPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) { + throw new IllegalArgumentException( + "index has a required pipeline [" + value + "] and a default pipeline [" + defaultPipeline + "]"); + } + } + + @Override + public Iterator> settings() { + return Collections.singletonList(DEFAULT_PIPELINE).iterator(); + } + + } /** * Marks an index to be searched throttled. This means that never more than one shard of such an index will be searched concurrently @@ -384,6 +441,7 @@ public final class IndexSettings { private volatile int maxAnalyzedOffset; private volatile int maxTermsCount; private volatile String defaultPipeline; + private volatile String requiredPipeline; private volatile boolean searchThrottled; /** @@ -555,6 +613,7 @@ public final class IndexSettings { scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter); scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); + scopedSettings.addSettingsUpdateConsumer(REQUIRED_PIPELINE, this::setRequiredPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis); @@ -746,7 +805,7 @@ public final class IndexSettings { public void setTranslogSyncInterval(TimeValue translogSyncInterval) { this.syncInterval = translogSyncInterval; } - + /** * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled. */ @@ -825,7 +884,7 @@ public final class IndexSettings { * Returns the max number of filters in adjacency_matrix aggregation search requests * @deprecated This setting will be removed in 8.0 */ - @Deprecated + @Deprecated public int getMaxAdjacencyMatrixFilters() { return this.maxAdjacencyMatrixFilters; } @@ -834,7 +893,7 @@ public final class IndexSettings { * @param maxAdjacencyFilters the max number of filters in adjacency_matrix aggregation search requests * @deprecated This setting will be removed in 8.0 */ - @Deprecated + @Deprecated private void setMaxAdjacencyMatrixFilters(int maxAdjacencyFilters) { this.maxAdjacencyMatrixFilters = maxAdjacencyFilters; } @@ -992,6 +1051,14 @@ public final class IndexSettings { this.defaultPipeline = defaultPipeline; } + public String getRequiredPipeline() { + return requiredPipeline; + } + + public void setRequiredPipeline(final String requiredPipeline) { + this.requiredPipeline = requiredPipeline; + } + /** * Returns true if soft-delete is enabled. */ diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index f213b523fbf..4bab8636bbf 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -106,6 +107,9 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa ClusterState state = mock(ClusterState.class); when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA); when(clusterService.state()).thenReturn(state); + DiscoveryNode localNode = mock(DiscoveryNode.class); + when(clusterService.localNode()).thenReturn(localNode); + when(localNode.isIngestNode()).thenReturn(randomBoolean()); final ThreadPool threadPool = mock(ThreadPool.class); final ExecutorService direct = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(direct); diff --git a/server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java b/server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java new file mode 100644 index 00000000000..2566dc02c5d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java @@ -0,0 +1,133 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasToString; + +public class RequiredPipelineIT extends ESIntegTestCase { + + public void testRequiredPipeline() { + final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build(); + createIndex("index", settings); + + // this asserts that the required_pipeline was used, without us having to actually create the pipeline etc. + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client().prepareIndex("index", "_doc", "1").setSource(Collections.singletonMap("field", "value")).get()); + assertThat(e, hasToString(containsString("pipeline with id [required_pipeline] does not exist"))); + } + + public void testDefaultAndRequiredPipeline() { + final Settings settings = Settings.builder() + .put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") + .put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline") + .build(); + final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> createIndex("index", settings)); + assertThat( + e, + hasToString(containsString("index has a default pipeline [default_pipeline] and a required pipeline [required_pipeline]"))); + } + + public void testDefaultAndRequiredPipelineFromTemplates() { + final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1); + final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE); + final int requiredPipelineOrder; + final int defaultPipelineOrder; + if (randomBoolean()) { + defaultPipelineOrder = lowOrder; + requiredPipelineOrder = highOrder; + } else { + defaultPipelineOrder = highOrder; + requiredPipelineOrder = lowOrder; + } + final Settings defaultPipelineSettings = + Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + admin().indices() + .preparePutTemplate("default") + .setPatterns(Collections.singletonList("index*")) + .setOrder(defaultPipelineOrder) + .setSettings(defaultPipelineSettings) + .get(); + final Settings requiredPipelineSettings = + Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build(); + admin().indices() + .preparePutTemplate("required") + .setPatterns(Collections.singletonList("index*")) + .setOrder(requiredPipelineOrder) + .setSettings(requiredPipelineSettings) + .get(); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client().prepareIndex("index", "_doc", "1").setSource(Collections.singletonMap("field", "value")).get()); + assertThat( + e, + hasToString(containsString( + "required pipeline [required_pipeline] and default pipeline [default_pipeline] can not both be set"))); + } + + public void testHighOrderRequiredPipelinePreferred() throws IOException { + final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1); + final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE); + final Settings defaultPipelineSettings = + Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "low_order_required_pipeline").build(); + admin().indices() + .preparePutTemplate("default") + .setPatterns(Collections.singletonList("index*")) + .setOrder(lowOrder) + .setSettings(defaultPipelineSettings) + .get(); + final Settings requiredPipelineSettings = + Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "high_order_required_pipeline").build(); + admin().indices() + .preparePutTemplate("required") + .setPatterns(Collections.singletonList("index*")) + .setOrder(highOrder) + .setSettings(requiredPipelineSettings) + .get(); + + // this asserts that the high_order_required_pipeline was selected, without us having to actually create the pipeline etc. + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client().prepareIndex("index", "_doc", "1").setSource(Collections.singletonMap("field", "value")).get()); + assertThat(e, hasToString(containsString("pipeline with id [high_order_required_pipeline] does not exist"))); + } + + public void testRequiredPipelineAndRequestPipeline() { + final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build(); + createIndex("index", settings); + final IndexRequestBuilder builder = client().prepareIndex("index", "_doc", "1"); + builder.setSource(Collections.singletonMap("field", "value")); + builder.setPipeline("request_pipeline"); + final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::get); + assertThat( + e, + hasToString(containsString("request pipeline [request_pipeline] can not override required pipeline [required_pipeline]"))); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index a1e9c2aea08..5f262b23bbb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -397,6 +397,7 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction