Replace required pipeline with final pipeline (#49470)

This commit enhances the required pipeline functionality by changing it
so that default/request pipelines can also be executed, but the required
pipeline is always executed last. This gives users the flexibility to
execute their own indexing pipelines, but also ensure that any required
pipelines are also executed. Since such pipelines are executed last, we
change the name of required pipelines to final pipelines.
This commit is contained in:
Jason Tedor 2019-11-22 14:00:38 -05:00
parent 1431c2b408
commit 71bcfbf1e3
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
13 changed files with 521 additions and 375 deletions

View File

@ -244,12 +244,12 @@ specific index module:
overridden using the `pipeline` parameter. The special pipeline name `_none` indicates
no ingest pipeline should be run.
`index.required_pipeline`::
The required <<ingest,ingest node>> pipeline for this index. Index requests
will fail if the required pipeline is set and the pipeline does not exist.
The required pipeline can not be overridden with the `pipeline` parameter. A
default pipeline and a required pipeline can not both be set. The special
pipeline name `_none` indicates no ingest pipeline will run.
`index.final_pipeline`::
The final <<ingest,ingest node>> pipeline for this index. Index requests
will fail if the final pipeline is set and the pipeline does not exist.
The final pipeline always runs after the request pipeline (if specified) and
the default pipeline (if it exists). The special pipeline name `_none`
indicates no ingest pipeline will run.
[float]
=== Settings in other index modules

View File

@ -3,7 +3,7 @@
[partintro]
--
Use an ingest node to pre-process documents before the actual document indexing happens.
Use an ingest node to pre-process documents before the actual document indexing happens.
The ingest node intercepts bulk and index requests, it applies transformations, and it then
passes the documents back to the index or bulk APIs.
@ -23,7 +23,7 @@ another processor that renames a field. The <<cluster-state,cluster state>> then
the configured pipelines.
To use a pipeline, simply specify the `pipeline` parameter on an index or bulk request. This
way, the ingest node knows which pipeline to use.
way, the ingest node knows which pipeline to use.
For example:
Create a pipeline
@ -79,6 +79,9 @@ Response
An index may also declare a <<dynamic-index-settings,default pipeline>> that will be used in the
absence of the `pipeline` parameter.
Finally, an index may also declare a <<dynamic-index-settings,final pipeline>>
that will be executed after any request or default pipeline (if any).
See <<ingest-apis,Ingest APIs>> for more information about creating, adding, and deleting pipelines.
--

View File

@ -6,7 +6,7 @@ teardown:
ignore: 404
---
"Test index with required pipeline":
"Test index with final pipeline":
- do:
ingest.put_pipeline:
id: "my_pipeline"
@ -23,14 +23,14 @@ teardown:
]
}
- match: { acknowledged: true }
# required pipeline via index
# final pipeline via index
- do:
indices.create:
index: test
body:
settings:
index:
required_pipeline: "my_pipeline"
final_pipeline: "my_pipeline"
aliases:
test_alias: {}
@ -46,7 +46,7 @@ teardown:
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via alias
# final pipeline via alias
- do:
index:
index: test_alias
@ -59,7 +59,7 @@ teardown:
id: 2
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via upsert
# final pipeline via upsert
- do:
update:
index: test
@ -75,7 +75,7 @@ teardown:
id: 3
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via scripted upsert
# final pipeline via scripted upsert
- do:
update:
index: test
@ -92,7 +92,7 @@ teardown:
id: 4
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via doc_as_upsert
# final pipeline via doc_as_upsert
- do:
update:
index: test
@ -106,7 +106,7 @@ teardown:
id: 5
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via bulk upsert
# final pipeline via bulk upsert
# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
# needs to be in the upsert, not the script
- do:
@ -164,12 +164,3 @@ teardown:
- match: { docs.5._source.bytes_source_field: "3kb" }
- match: { docs.5._source.bytes_target_field: 3072 }
- match: { docs.5._source.ran_script: true }
# bad request, request pipeline can not be specified
- do:
catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/
index:
index: test
id: 9
pipeline: "pipeline"
body: {bytes_source_field: "1kb"}

View File

@ -79,7 +79,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -167,7 +166,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
boolean indexRequestHasPipeline = resolvePipelines(actionRequest, indexRequest, metaData);
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
}
@ -273,16 +272,14 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
}
static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalRequest,
IndexRequest indexRequest,
MetaData metaData) {
static boolean resolvePipelines(final DocWriteRequest<?> originalRequest, final IndexRequest indexRequest, final MetaData metaData) {
if (indexRequest.isPipelineResolved() == false) {
final String requestPipeline = indexRequest.getPipeline();
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
boolean requestCanOverridePipeline = true;
String requiredPipeline = null;
// start to look for default or required pipelines via settings found in the index meta data
indexRequest.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME);
String defaultPipeline = null;
String finalPipeline = null;
// start to look for default or final pipelines via settings found in the index meta data
IndexMetaData indexMetaData = metaData.indices().get(originalRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
@ -302,64 +299,42 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
if (indexMetaData != null) {
final Settings indexSettings = indexMetaData.getSettings();
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
// find the required pipeline if one is defined from an existing index
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(requiredPipeline);
requestCanOverridePipeline = false;
} else {
// find the default pipeline if one is defined from an existing index
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
if (IndexSettings.DEFAULT_PIPELINE.exists(indexSettings)) {
// find the default pipeline if one is defined from an existing index setting
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
}
if (IndexSettings.FINAL_PIPELINE.exists(indexSettings)) {
// find the final pipeline if one is defined from an existing index setting
finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexSettings);
indexRequest.setFinalPipeline(finalPipeline);
}
} else if (indexRequest.index() != null) {
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
// the index does not exist yet (and this is a valid request), so match index templates to look for pipelines
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
// order of templates are highest order first, we have to iterate through them all though
String defaultPipeline = null;
for (IndexTemplateMetaData template : templates) {
// order of templates are highest order first
for (final IndexTemplateMetaData template : templates) {
final Settings settings = template.settings();
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
requestCanOverridePipeline = false;
// we can not break in case a lower-order template has a default pipeline that we need to reject
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
// we can not break in case a lower-order template has a required pipeline that we need to reject
// we can not break in case a lower-order template has a final pipeline that we need to collect
}
if (finalPipeline == null && IndexSettings.FINAL_PIPELINE.exists(settings)) {
finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings);
// we can not break in case a lower-order template has a default pipeline that we need to collect
}
if (defaultPipeline != null && finalPipeline != null) {
// we can break if we have already collected a default and final pipeline
break;
}
}
if (requiredPipeline != null && defaultPipeline != null) {
// we can not have picked up a required and a default pipeline from applying templates
final String message = String.format(
Locale.ROOT,
"required pipeline [%s] and default pipeline [%s] can not both be set",
requiredPipeline,
defaultPipeline);
throw new IllegalArgumentException(message);
}
final String pipeline;
if (requiredPipeline != null) {
pipeline = requiredPipeline;
} else {
pipeline = defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME;
}
indexRequest.setPipeline(pipeline);
indexRequest.setPipeline(defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME);
indexRequest.setFinalPipeline(finalPipeline != null ? finalPipeline : IngestService.NOOP_PIPELINE_NAME);
}
if (requestPipeline != null) {
if (requestCanOverridePipeline == false) {
final String message = String.format(
Locale.ROOT,
"request pipeline [%s] can not override required pipeline [%s]",
requestPipeline,
requiredPipeline);
throw new IllegalArgumentException(message);
} else {
indexRequest.setPipeline(requestPipeline);
}
indexRequest.setPipeline(requestPipeline);
}
/*
@ -375,8 +350,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
indexRequest.isPipelineResolved(true);
}
// Return whether this index request has a pipeline
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false;
// return whether this index request has a pipeline
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false
|| IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false;
}
boolean needToCheck() {

View File

@ -101,6 +101,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private XContentType contentType;
private String pipeline;
private String finalPipeline;
private boolean isPipelineResolved;
@ -133,6 +134,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
pipeline = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
finalPipeline = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
isPipelineResolved = in.readBoolean();
}
@ -246,6 +250,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
validationException = addValidationError("pipeline cannot be an empty string", validationException);
}
if (finalPipeline != null && finalPipeline.isEmpty()) {
validationException = addValidationError("final pipeline cannot be an empty string", validationException);
}
return validationException;
}
@ -350,6 +357,26 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
return this.pipeline;
}
/**
* Sets the final ingest pipeline to be executed before indexing the document.
*
* @param finalPipeline the name of the final pipeline
* @return this index request
*/
public IndexRequest setFinalPipeline(final String finalPipeline) {
this.finalPipeline = finalPipeline;
return this;
}
/**
* Returns the final ingest pipeline to be executed before indexing the document.
*
* @return the name of the final pipeline
*/
public String getFinalPipeline() {
return this.finalPipeline;
}
/**
* Sets if the pipeline for this request has been resolved by the coordinating node.
*
@ -686,6 +713,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
out.writeLong(version);
out.writeByte(versionType.getValue());
out.writeOptionalString(pipeline);
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeOptionalString(finalPipeline);
}
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeBoolean(isPipelineResolved);
}

View File

@ -166,7 +166,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
IndexSettings.DEFAULT_PIPELINE,
IndexSettings.REQUIRED_PIPELINE,
IndexSettings.FINAL_PIPELINE,
MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
// validate that built-in similarities don't get redefined

View File

@ -35,10 +35,8 @@ import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.Node;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@ -312,67 +310,16 @@ public final class IndexSettings {
new Setting<>("index.default_pipeline",
IngestService.NOOP_PIPELINE_NAME,
Function.identity(),
new DefaultPipelineValidator(),
Property.Dynamic,
Property.IndexScope);
public static final Setting<String> REQUIRED_PIPELINE =
new Setting<>("index.required_pipeline",
public static final Setting<String> FINAL_PIPELINE =
new Setting<>("index.final_pipeline",
IngestService.NOOP_PIPELINE_NAME,
Function.identity(),
new RequiredPipelineValidator(),
Property.Dynamic,
Property.IndexScope);
static class DefaultPipelineValidator implements Setting.Validator<String> {
@Override
public void validate(final String value) {
}
@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {
final String requiredPipeline = (String) settings.get(IndexSettings.REQUIRED_PIPELINE);
if (value.equals(IngestService.NOOP_PIPELINE_NAME) == false
&& requiredPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) {
throw new IllegalArgumentException(
"index has a default pipeline [" + value + "] and a required pipeline [" + requiredPipeline + "]");
}
}
@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(REQUIRED_PIPELINE);
return settings.iterator();
}
}
static class RequiredPipelineValidator implements Setting.Validator<String> {
@Override
public void validate(final String value) {
}
@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {
final String defaultPipeline = (String) settings.get(IndexSettings.DEFAULT_PIPELINE);
if (value.equals(IngestService.NOOP_PIPELINE_NAME) && defaultPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) {
throw new IllegalArgumentException(
"index has a required pipeline [" + value + "] and a default pipeline [" + defaultPipeline + "]");
}
}
@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(DEFAULT_PIPELINE);
return settings.iterator();
}
}
/**
* Marks an index to be searched throttled. This means that never more than one shard of such an index will be searched concurrently
*/
@ -623,7 +570,7 @@ public final class IndexSettings {
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
scopedSettings.addSettingsUpdateConsumer(REQUIRED_PIPELINE, this::setRequiredPipeline);
scopedSettings.addSettingsUpdateConsumer(FINAL_PIPELINE, this::setRequiredPipeline);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis);

View File

@ -53,6 +53,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -333,15 +334,15 @@ public class IngestService implements ClusterStateApplier {
public void executeBulkRequest(int numberOfActionRequests,
Iterable<DocWriteRequest<?>> actionRequests,
BiConsumer<Integer, Exception> itemFailureHandler,
BiConsumer<Thread, Exception> completionHandler,
IntConsumer itemDroppedHandler) {
BiConsumer<Integer, Exception> onFailure,
BiConsumer<Thread, Exception> onCompletion,
IntConsumer onDropped) {
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
completionHandler.accept(null, e);
onCompletion.accept(null, e);
}
@Override
@ -353,54 +354,83 @@ public class IngestService implements ClusterStateApplier {
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
if (indexRequest == null) {
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
}
assert counter.get() >= 0;
continue;
}
String pipelineId = indexRequest.getPipeline();
if (NOOP_PIPELINE_NAME.equals(pipelineId)) {
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
continue;
}
final int slot = i;
try {
PipelineHolder holder = pipelines.get(pipelineId);
if (holder == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
}
Pipeline pipeline = holder.pipeline;
innerExecute(slot, indexRequest, pipeline, itemDroppedHandler, e -> {
if (e == null) {
// this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
} else {
itemFailureHandler.accept(slot, e);
}
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
}
assert counter.get() >= 0;
});
} catch (Exception e) {
itemFailureHandler.accept(slot, e);
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
final String pipelineId = indexRequest.getPipeline();
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
final String finalPipelineId = indexRequest.getFinalPipeline();
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
final List<String> pipelines;
if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false
&& IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
pipelines = Arrays.asList(pipelineId, finalPipelineId);
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false ) {
pipelines = Collections.singletonList(pipelineId);
} else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
pipelines = Collections.singletonList(finalPipelineId);
} else {
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
continue;
}
executePipelines(i, pipelines.iterator(), indexRequest, onDropped, onFailure, counter, onCompletion, originalThread);
i++;
}
}
});
}
private void executePipelines(
final int slot,
final Iterator<String> it,
final IndexRequest indexRequest,
final IntConsumer onDropped,
final BiConsumer<Integer, Exception> onFailure,
final AtomicInteger counter,
final BiConsumer<Thread, Exception> onCompletion,
final Thread originalThread
) {
while (it.hasNext()) {
final String pipelineId = it.next();
try {
PipelineHolder holder = pipelines.get(pipelineId);
if (holder == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
}
Pipeline pipeline = holder.pipeline;
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
if (e != null) {
onFailure.accept(slot, e);
}
if (it.hasNext()) {
executePipelines(slot, it, indexRequest, onDropped, onFailure, counter, onCompletion, originalThread);
} else {
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
}
});
} catch (Exception e) {
onFailure.accept(slot, e);
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
break;
}
}
}
public IngestStats stats() {
IngestStats.Builder statsBuilder = new IngestStats.Builder();
statsBuilder.addTotalMetrics(totalMetrics);

View File

@ -88,7 +88,7 @@ public class TransportBulkActionTests extends ESTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("TransportBulkActionTookTests");
threadPool = new TestThreadPool(getClass().getName());
clusterService = createClusterService(threadPool);
CapturingTransport capturingTransport = new CapturingTransport();
transportService = capturingTransport.createTransportService(clusterService.getSettings(), threadPool,
@ -173,14 +173,14 @@ public class TransportBulkActionTests extends ESTestCase {
// index name matches with IDM:
IndexRequest indexRequest = new IndexRequest("idx");
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
boolean result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("default-pipeline"));
// alias name matches with IDM:
indexRequest = new IndexRequest("alias");
result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("default-pipeline"));
@ -191,15 +191,15 @@ public class TransportBulkActionTests extends ESTestCase {
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"));
metaData = MetaData.builder().put(templateBuilder).build();
indexRequest = new IndexRequest("idx");
result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("default-pipeline"));
}
public void testResolveRequiredOrDefaultPipelineRequiredPipeline() {
public void testResolveFinalPipeline() {
IndexMetaData.Builder builder = IndexMetaData.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline"))
.settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0)
.putAlias(AliasMetaData.builder("alias").writeIndex(true).build());
@ -207,52 +207,39 @@ public class TransportBulkActionTests extends ESTestCase {
// index name matches with IDM:
IndexRequest indexRequest = new IndexRequest("idx");
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
boolean result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("required-pipeline"));
assertThat(indexRequest.getPipeline(), equalTo("_none"));
assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline"));
// alias name matches with IDM:
indexRequest = new IndexRequest("alias");
result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("required-pipeline"));
assertThat(indexRequest.getPipeline(), equalTo("_none"));
assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline"));
// index name matches with ITMD:
IndexTemplateMetaData.Builder templateBuilder = IndexTemplateMetaData.builder("name1")
.patterns(Collections.singletonList("id*"))
.settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline"));
.settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline"));
metaData = MetaData.builder().put(templateBuilder).build();
indexRequest = new IndexRequest("idx");
result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("required-pipeline"));
assertThat(indexRequest.getPipeline(), equalTo("_none"));
assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline"));
}
public void testResolveRequiredOrDefaultAndRequiredPipeline() {
IndexTemplateMetaData.Builder builder1 = IndexTemplateMetaData.builder("name1")
.patterns(Collections.singletonList("i*"))
.settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline"));
IndexTemplateMetaData.Builder builder2 = IndexTemplateMetaData.builder("name2")
.patterns(Collections.singletonList("id*"))
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"));
MetaData metaData = MetaData.builder().put(builder1).put(builder2).build();
IndexRequest indexRequest = new IndexRequest("idx");
Exception e = expectThrows(IllegalArgumentException.class,
() -> TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData));
assertThat(e.getMessage(),
equalTo("required pipeline [required-pipeline] and default pipeline [default-pipeline] can not both be set"));
}
public void testResolveRequiredOrDefaultPipelineRequestPipeline() {
public void testResolveRequestOrDefaultPipelineAndFinalPipeline() {
// no pipeline:
{
MetaData metaData = MetaData.builder().build();
IndexRequest indexRequest = new IndexRequest("idx");
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
boolean result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData);
assertThat(result, is(false));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
@ -262,7 +249,7 @@ public class TransportBulkActionTests extends ESTestCase {
{
MetaData metaData = MetaData.builder().build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline");
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
boolean result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("request-pipeline"));
@ -276,38 +263,25 @@ public class TransportBulkActionTests extends ESTestCase {
.numberOfReplicas(0);
MetaData metaData = MetaData.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline");
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
boolean result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("request-pipeline"));
}
// request pipeline with required pipeline:
// request pipeline with final pipeline:
{
IndexMetaData.Builder builder = IndexMetaData.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline"))
.settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
MetaData metaData = MetaData.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline");
Exception e = expectThrows(IllegalArgumentException.class,
() -> TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData));
assertThat(e.getMessage(),
equalTo("request pipeline [request-pipeline] can not override required pipeline [required-pipeline]"));
}
// request pipeline set to required pipeline:
{
IndexMetaData.Builder builder = IndexMetaData.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
MetaData metaData = MetaData.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("required-pipeline").isPipelineResolved(true);
boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData);
boolean result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("required-pipeline"));
assertThat(indexRequest.getPipeline(), equalTo("request-pipeline"));
assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline"));
}
}
}

View File

@ -0,0 +1,313 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.junit.After;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasToString;
public class FinalPipelineIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(TestPlugin.class);
}
@After
public void cleanUpPipelines() {
final GetPipelineResponse response = client().admin()
.cluster()
.getPipeline(new GetPipelineRequest("default_pipeline", "final_pipeline", "request_pipeline"))
.actionGet();
for (final PipelineConfiguration pipeline : response.pipelines()) {
client().admin().cluster().deletePipeline(new DeletePipelineRequest(pipeline.getId())).actionGet();
}
}
public void testFinalPipeline() {
final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build();
createIndex("index", settings);
// this asserts that the final_pipeline was used, without us having to actually create the pipeline etc.
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> client().prepareIndex("index", "_doc", "1").setSource(Collections.singletonMap("field", "value")).get());
assertThat(e, hasToString(containsString("pipeline with id [final_pipeline] does not exist")));
}
public void testRequestPipelineAndFinalPipeline() {
final BytesReference requestPipelineBody = new BytesArray("{\"processors\": [{\"request\": {}}]}");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("request_pipeline", requestPipelineBody, XContentType.JSON))
.actionGet();
final BytesReference finalPipelineBody = new BytesArray("{\"processors\": [{\"final\": {\"exists\":\"request\"}}]}");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON))
.actionGet();
final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build();
createIndex("index", settings);
final IndexRequestBuilder index = client().prepareIndex("index", "_doc", "1");
index.setSource(Collections.singletonMap("field", "value"));
index.setPipeline("request_pipeline");
index.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
final IndexResponse response = index.get();
assertThat(response.status(), equalTo(RestStatus.CREATED));
final GetRequestBuilder get = client().prepareGet("index", "_doc", "1");
final GetResponse getResponse = get.get();
assertTrue(getResponse.isExists());
final Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source, hasKey("request"));
assertTrue((boolean) source.get("request"));
assertThat(source, hasKey("final"));
assertTrue((boolean) source.get("final"));
}
public void testDefaultAndFinalPipeline() {
final BytesReference defaultPipelineBody = new BytesArray("{\"processors\": [{\"default\": {}}]}");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
.actionGet();
final BytesReference finalPipelineBody = new BytesArray("{\"processors\": [{\"final\": {\"exists\":\"default\"}}]}");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON))
.actionGet();
final Settings settings = Settings.builder()
.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
.put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline")
.build();
createIndex("index", settings);
final IndexRequestBuilder index = client().prepareIndex("index", "_doc", "1");
index.setSource(Collections.singletonMap("field", "value"));
index.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
final IndexResponse response = index.get();
assertThat(response.status(), equalTo(RestStatus.CREATED));
final GetRequestBuilder get = client().prepareGet("index", "_doc", "1");
final GetResponse getResponse = get.get();
assertTrue(getResponse.isExists());
final Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source, hasKey("default"));
assertTrue((boolean) source.get("default"));
assertThat(source, hasKey("final"));
assertTrue((boolean) source.get("final"));
}
public void testDefaultAndFinalPipelineFromTemplates() {
final BytesReference defaultPipelineBody = new BytesArray("{\"processors\": [{\"default\": {}}]}");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
.actionGet();
final BytesReference finalPipelineBody = new BytesArray("{\"processors\": [{\"final\": {\"exists\":\"default\"}}]}");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON))
.actionGet();
final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1);
final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE);
final int finalPipelineOrder;
final int defaultPipelineOrder;
if (randomBoolean()) {
defaultPipelineOrder = lowOrder;
finalPipelineOrder = highOrder;
} else {
defaultPipelineOrder = highOrder;
finalPipelineOrder = lowOrder;
}
final Settings defaultPipelineSettings =
Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
admin().indices()
.preparePutTemplate("default")
.setPatterns(Collections.singletonList("index*"))
.setOrder(defaultPipelineOrder)
.setSettings(defaultPipelineSettings)
.get();
final Settings finalPipelineSettings =
Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build();
admin().indices()
.preparePutTemplate("final")
.setPatterns(Collections.singletonList("index*"))
.setOrder(finalPipelineOrder)
.setSettings(finalPipelineSettings)
.get();
final IndexRequestBuilder index = client().prepareIndex("index", "_doc", "1");
index.setSource(Collections.singletonMap("field", "value"));
index.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
final IndexResponse response = index.get();
assertThat(response.status(), equalTo(RestStatus.CREATED));
final GetRequestBuilder get = client().prepareGet("index", "_doc", "1");
final GetResponse getResponse = get.get();
assertTrue(getResponse.isExists());
final Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source, hasKey("default"));
assertTrue((boolean) source.get("default"));
assertThat(source, hasKey("final"));
assertTrue((boolean) source.get("final"));
}
public void testHighOrderFinalPipelinePreferred() throws IOException {
final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1);
final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE);
final Settings lowOrderFinalPipelineSettings =
Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "low_order_final_pipeline").build();
admin().indices()
.preparePutTemplate("low_order")
.setPatterns(Collections.singletonList("index*"))
.setOrder(lowOrder)
.setSettings(lowOrderFinalPipelineSettings)
.get();
final Settings highOrderFinalPipelineSettings =
Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "high_order_final_pipeline").build();
admin().indices()
.preparePutTemplate("high_order")
.setPatterns(Collections.singletonList("index*"))
.setOrder(highOrder)
.setSettings(highOrderFinalPipelineSettings)
.get();
// this asserts that the high_order_final_pipeline was selected, without us having to actually create the pipeline etc.
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> client().prepareIndex("index", "_doc", "1").setSource(Collections.singletonMap("field", "value")).get());
assertThat(e, hasToString(containsString("pipeline with id [high_order_final_pipeline] does not exist")));
}
public static class TestPlugin extends Plugin implements IngestPlugin {
@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry) {
return Collections.emptyList();
}
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
final HashMap<String, Processor.Factory> map = new HashMap<>(3);
map.put(
"default",
(factories, tag, config) ->
new AbstractProcessor(tag) {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.setFieldValue("default", true);
return ingestDocument;
}
@Override
public String getType() {
return "default";
}
});
map.put(
"final",
(processorFactories, tag, config) -> {
final String exists = (String) config.remove("exists");
return new AbstractProcessor(tag) {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
// this asserts that this pipeline is the final pipeline executed
if (exists != null) {
if (ingestDocument.getSourceAndMetadata().containsKey(exists) == false) {
throw new AssertionError(
"expected document to contain [" + exists + "] but was [" + ingestDocument.getSourceAndMetadata());
}
}
ingestDocument.setFieldValue("final", true);
return ingestDocument;
}
@Override
public String getType() {
return "final";
}
};
});
map.put(
"request",
(processorFactories, tag, config) ->
new AbstractProcessor(tag) {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.setFieldValue("request", true);
return ingestDocument;
}
@Override
public String getType() {
return "request";
}
}
);
return map;
}
}
}

View File

@ -1,132 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasToString;
public class RequiredPipelineIT extends ESIntegTestCase {
public void testRequiredPipeline() {
final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build();
createIndex("index", settings);
// this asserts that the required_pipeline was used, without us having to actually create the pipeline etc.
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> client().prepareIndex("index", "_doc", "1").setSource(Collections.singletonMap("field", "value")).get());
assertThat(e, hasToString(containsString("pipeline with id [required_pipeline] does not exist")));
}
public void testDefaultAndRequiredPipeline() {
final Settings settings = Settings.builder()
.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
.put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline")
.build();
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> createIndex("index", settings));
assertThat(
e,
hasToString(containsString("index has a default pipeline [default_pipeline] and a required pipeline [required_pipeline]")));
}
public void testDefaultAndRequiredPipelineFromTemplates() {
final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1);
final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE);
final int requiredPipelineOrder;
final int defaultPipelineOrder;
if (randomBoolean()) {
defaultPipelineOrder = lowOrder;
requiredPipelineOrder = highOrder;
} else {
defaultPipelineOrder = highOrder;
requiredPipelineOrder = lowOrder;
}
final Settings defaultPipelineSettings =
Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
admin().indices()
.preparePutTemplate("default")
.setPatterns(Collections.singletonList("index*"))
.setOrder(defaultPipelineOrder)
.setSettings(defaultPipelineSettings)
.get();
final Settings requiredPipelineSettings =
Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build();
admin().indices()
.preparePutTemplate("required")
.setPatterns(Collections.singletonList("index*"))
.setOrder(requiredPipelineOrder)
.setSettings(requiredPipelineSettings)
.get();
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> client().prepareIndex("index", "_doc", "1").setSource(Collections.singletonMap("field", "value")).get());
assertThat(
e,
hasToString(containsString(
"required pipeline [required_pipeline] and default pipeline [default_pipeline] can not both be set")));
}
public void testHighOrderRequiredPipelinePreferred() throws IOException {
final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1);
final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE);
final Settings defaultPipelineSettings =
Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "low_order_required_pipeline").build();
admin().indices()
.preparePutTemplate("default")
.setPatterns(Collections.singletonList("index*"))
.setOrder(lowOrder)
.setSettings(defaultPipelineSettings)
.get();
final Settings requiredPipelineSettings =
Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "high_order_required_pipeline").build();
admin().indices()
.preparePutTemplate("required")
.setPatterns(Collections.singletonList("index*"))
.setOrder(highOrder)
.setSettings(requiredPipelineSettings)
.get();
// this asserts that the high_order_required_pipeline was selected, without us having to actually create the pipeline etc.
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> client().prepareIndex("index", "_doc", "1").setSource(Collections.singletonMap("field", "value")).get());
assertThat(e, hasToString(containsString("pipeline with id [high_order_required_pipeline] does not exist")));
}
public void testRequiredPipelineAndRequestPipeline() {
final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build();
createIndex("index", settings);
final IndexRequestBuilder builder = client().prepareIndex("index", "_doc", "1");
builder.setSource(Collections.singletonMap("field", "value"));
builder.setPipeline("request_pipeline");
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::get);
assertThat(
e,
hasToString(containsString("request pipeline [request_pipeline] can not override required pipeline [required_pipeline]")));
}
}

View File

@ -133,7 +133,8 @@ public class IngestServiceTests extends ESTestCase {
when(threadPool.executor(anyString())).thenReturn(executorService);
IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null,
null, Collections.singletonList(DUMMY_PLUGIN), client);
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
final SetOnce<Boolean> failure = new SetOnce<>();
final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
@ -641,7 +642,8 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final SetOnce<Boolean> failure = new SetOnce<>();
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id);
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id).setFinalPipeline("_none");
final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause().getMessage(), equalTo("error"));
@ -672,8 +674,10 @@ public class IngestServiceTests extends ESTestCase {
IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 =
new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist");
IndexRequest indexRequest2 = new IndexRequest("_index", "_type", "_id")
.source(Collections.emptyMap())
.setPipeline("does_not_exist")
.setFinalPipeline("_none");
bulkRequest.add(indexRequest2);
@SuppressWarnings("unchecked")
BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@ -708,7 +712,8 @@ public class IngestServiceTests extends ESTestCase {
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
@SuppressWarnings("unchecked")
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
@ -726,7 +731,8 @@ public class IngestServiceTests extends ESTestCase {
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
@SuppressWarnings("unchecked")
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
@ -765,7 +771,8 @@ public class IngestServiceTests extends ESTestCase {
handler.accept(ingestDocument, null);
return null;
}).when(processor).execute(any(), any());
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
@SuppressWarnings("unchecked")
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
@ -792,7 +799,8 @@ public class IngestServiceTests extends ESTestCase {
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
doThrow(new RuntimeException())
.when(processor)
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
@ -836,7 +844,8 @@ public class IngestServiceTests extends ESTestCase {
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
@SuppressWarnings("unchecked")
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
@ -864,7 +873,8 @@ public class IngestServiceTests extends ESTestCase {
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
doThrow(new RuntimeException())
.when(onFailureOnFailureProcessor)
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
@ -899,7 +909,8 @@ public class IngestServiceTests extends ESTestCase {
request = new UpdateRequest("_index", "_type", "_id");
}
} else {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId);
IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId).setFinalPipeline("_none");
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
request = indexRequest;
numIndexRequests++;
@ -952,7 +963,8 @@ public class IngestServiceTests extends ESTestCase {
logger.info("Using [{}], not randomly determined default [{}]", xContentType, Requests.INDEX_CONTENT_TYPE);
int numRequest = scaledRandomIntBetween(8, 64);
for (int i = 0; i < numRequest; i++) {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId);
IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId).setFinalPipeline("_none");
indexRequest.source(xContentType, "field1", "value1");
bulkRequest.add(indexRequest);
}
@ -1036,7 +1048,7 @@ public class IngestServiceTests extends ESTestCase {
@SuppressWarnings("unchecked") final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
final IndexRequest indexRequest = new IndexRequest("_index");
indexRequest.setPipeline("_id1");
indexRequest.setPipeline("_id1").setFinalPipeline("_none");
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterFirstRequestStats = ingestService.stats();
@ -1163,7 +1175,8 @@ public class IngestServiceTests extends ESTestCase {
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
@SuppressWarnings("unchecked")
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")

View File

@ -397,7 +397,7 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
nonReplicatedSettings.add(IndexSettings.MAX_SLICES_PER_SCROLL);
nonReplicatedSettings.add(IndexSettings.MAX_ADJACENCY_MATRIX_FILTERS_SETTING);
nonReplicatedSettings.add(IndexSettings.DEFAULT_PIPELINE);
nonReplicatedSettings.add(IndexSettings.REQUIRED_PIPELINE);
nonReplicatedSettings.add(IndexSettings.FINAL_PIPELINE);
nonReplicatedSettings.add(IndexSettings.INDEX_SEARCH_THROTTLED);
nonReplicatedSettings.add(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING);