Add ingest processor existence helper method (#45156)

This commit adds a helper method to the ingest service allowing it to
inspect a pipeline by id and verify the existence of a processor in the
pipeline. This work exposed a potential bug in that some processors
contain inner processors that are passed in at instantiation. These
processors needed a common way to expose their inner processors, so the
WrappingProcessor was created in order to expose the inner processor.
This commit is contained in:
Michael Basnight 2019-08-07 10:52:14 -05:00
parent 341ab48ec0
commit 89861d0884
9 changed files with 272 additions and 43 deletions

View File

@ -28,6 +28,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.elasticsearch.ingest.WrappingProcessor;
import org.elasticsearch.script.ScriptService;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
@ -43,7 +45,7 @@ import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
*
* Note that this processor is experimental.
*/
public final class ForEachProcessor extends AbstractProcessor {
public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor {
public static final String TYPE = "foreach";
@ -97,7 +99,7 @@ public final class ForEachProcessor extends AbstractProcessor {
return field;
}
Processor getProcessor() {
public Processor getInnerProcessor() {
return processor;
}

View File

@ -49,7 +49,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
assertThat(forEachProcessor, Matchers.notNullValue());
assertThat(forEachProcessor.getField(), equalTo("_field"));
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
assertThat(forEachProcessor.getInnerProcessor(), Matchers.sameInstance(processor));
assertFalse(forEachProcessor.isIgnoreMissing());
}
@ -66,7 +66,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
assertThat(forEachProcessor, Matchers.notNullValue());
assertThat(forEachProcessor.getField(), equalTo("_field"));
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
assertThat(forEachProcessor.getInnerProcessor(), Matchers.sameInstance(processor));
assertTrue(forEachProcessor.isIgnoreMissing());
}

View File

@ -38,7 +38,7 @@ import org.elasticsearch.script.IngestConditionalScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
public class ConditionalProcessor extends AbstractProcessor {
public class ConditionalProcessor extends AbstractProcessor implements WrappingProcessor {
private static final Map<String, String> DEPRECATIONS;
static {
@ -98,7 +98,7 @@ public class ConditionalProcessor extends AbstractProcessor {
new DeprecationMap(ingestDocument.getSourceAndMetadata(), DEPRECATIONS, "conditional-processor")));
}
Processor getProcessor() {
public Processor getInnerProcessor() {
return processor;
}

View File

@ -388,7 +388,7 @@ public class IngestService implements ClusterStateApplier {
static String getProcessorName(Processor processor){
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
if(processor instanceof ConditionalProcessor){
processor = ((ConditionalProcessor) processor).getProcessor();
processor = ((ConditionalProcessor) processor).getInnerProcessor();
}
StringBuilder sb = new StringBuilder(5);
sb.append(processor.getType());
@ -561,6 +561,40 @@ public class IngestService implements ClusterStateApplier {
}
}
/**
* Determine if a pipeline contains a processor class within it by introspecting all of the processors within the pipeline.
* @param pipelineId the pipeline to inspect
* @param clazz the Processor class to look for
* @return True if the pipeline contains an instance of the Processor class passed in
*/
public boolean hasProcessor(String pipelineId, Class<? extends Processor> clazz) {
Pipeline pipeline = getPipeline(pipelineId);
if (pipeline == null) {
return false;
}
for (Processor processor: pipeline.flattenAllProcessors()) {
if (clazz.isAssignableFrom(processor.getClass())) {
return true;
}
while (processor instanceof WrappingProcessor) {
WrappingProcessor wrappingProcessor = (WrappingProcessor) processor;
if (clazz.isAssignableFrom(wrappingProcessor.getInnerProcessor().getClass())) {
return true;
}
processor = wrappingProcessor.getInnerProcessor();
// break in the case of self referencing processors in the event a processor author creates a
// wrapping processor that has its inner processor refer to itself.
if (wrappingProcessor == processor) {
break;
}
}
}
return false;
}
private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";

View File

@ -49,8 +49,8 @@ public final class TrackingResultProcessor implements Processor {
if (conditionalProcessor.evaluate(ingestDocument) == false) {
return ingestDocument;
}
if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) {
processor = conditionalProcessor.getProcessor();
if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) {
processor = conditionalProcessor.getInnerProcessor();
}
}
if (processor instanceof PipelineProcessor) {

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
/**
* A srapping processor is one that encapsulates an inner processor, or a processor that the wrapped processor enacts upon. All processors
* that contain an "inner" processor should implement this interface, such that the actual processor can be obtained.
*/
public interface WrappingProcessor extends Processor {
/**
* Method for retrieving the inner processor from a wrapped processor.
* @return the inner processor
*/
Processor getInnerProcessor();
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import java.util.function.Consumer;
class FakeProcessor implements Processor {
private String type;
private String tag;
private Consumer<IngestDocument> executor;
FakeProcessor(String type, String tag, Consumer<IngestDocument> executor) {
this.type = type;
this.tag = tag;
this.executor = executor;
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
executor.accept(ingestDocument);
return ingestDocument;
}
@Override
public String getType() {
return type;
}
@Override
public String getTag() {
return tag;
}
}

View File

@ -43,10 +43,16 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.threadpool.ThreadPool;
@ -64,6 +70,7 @@ import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -282,6 +289,89 @@ public class IngestServiceTests extends ESTestCase {
ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest);
}
public void testHasProcessor() throws Exception {
IngestService ingestService = createWithProcessors();
String id = "_id";
Pipeline pipeline = ingestService.getPipeline(id);
assertThat(pipeline, nullValue());
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(
"{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," +
"{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"),
XContentType.JSON);
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
pipeline = ingestService.getPipeline(id);
assertThat(pipeline, notNullValue());
assertTrue(ingestService.hasProcessor(id, Processor.class));
assertTrue(ingestService.hasProcessor(id, WrappingProcessorImpl.class));
assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class));
assertTrue(ingestService.hasProcessor(id, FakeProcessor.class));
assertFalse(ingestService.hasProcessor(id, ConditionalProcessor.class));
}
public void testHasProcessorComplexConditional() throws Exception {
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
String scriptName = "conditionalScript";
ScriptService scriptService = new ScriptService(Settings.builder().build(),
Collections.singletonMap(
Script.DEFAULT_SCRIPT_LANG,
new MockScriptEngine(
Script.DEFAULT_SCRIPT_LANG,
Collections.singletonMap(
scriptName, ctx -> {
ctx.get("_type");
return true;
}
),
Collections.emptyMap()
)
),
new HashMap<>(ScriptModule.CORE_CONTEXTS)
);
Map<String, Processor.Factory> processors = new HashMap<>();
processors.put("complexSet", (factories, tag, config) -> {
String field = (String) config.remove("field");
String value = (String) config.remove("value");
return new ConditionalProcessor(randomAlphaOfLength(10),
new Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
scriptName, Collections.emptyMap()), scriptService,
new ConditionalProcessor(randomAlphaOfLength(10) + "-nested",
new Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
scriptName, Collections.emptyMap()), scriptService,
new FakeProcessor("complexSet", tag, (ingestDocument) -> ingestDocument.setFieldValue(field, value))));
});
IngestService ingestService = createWithProcessors(processors);
String id = "_id";
Pipeline pipeline = ingestService.getPipeline(id);
assertThat(pipeline, nullValue());
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
PutPipelineRequest putRequest = new PutPipelineRequest(id,
new BytesArray("{\"processors\": [{\"complexSet\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON);
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
pipeline = ingestService.getPipeline(id);
assertThat(pipeline, notNullValue());
assertTrue(ingestService.hasProcessor(id, Processor.class));
assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class));
assertTrue(ingestService.hasProcessor(id, FakeProcessor.class));
assertTrue(ingestService.hasProcessor(id, ConditionalProcessor.class));
assertFalse(ingestService.hasProcessor(id, WrappingProcessorImpl.class));
}
public void testCrud() throws Exception {
IngestService ingestService = createWithProcessors();
String id = "_id";
@ -965,7 +1055,7 @@ public class IngestServiceTests extends ESTestCase {
assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));
ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class);
when(conditionalProcessor.getProcessor()).thenReturn(processor);
when(conditionalProcessor.getInnerProcessor()).thenReturn(processor);
assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag));
PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
@ -1031,42 +1121,11 @@ public class IngestServiceTests extends ESTestCase {
processors.put("set", (factories, tag, config) -> {
String field = (String) config.remove("field");
String value = (String) config.remove("value");
return new Processor() {
@Override
public IngestDocument execute(IngestDocument ingestDocument) {
ingestDocument.setFieldValue(field, value);
return ingestDocument;
}
@Override
public String getType() {
return "set";
}
@Override
public String getTag() {
return tag;
}
};
return new FakeProcessor("set", tag, (ingestDocument) ->ingestDocument.setFieldValue(field, value));
});
processors.put("remove", (factories, tag, config) -> {
String field = (String) config.remove("field");
return new Processor() {
@Override
public IngestDocument execute(IngestDocument ingestDocument) {
ingestDocument.removeField(field);
return ingestDocument;
}
@Override
public String getType() {
return "remove";
}
@Override
public String getTag() {
return tag;
}
return new WrappingProcessorImpl("remove", tag, (ingestDocument -> ingestDocument.removeField(field))) {
};
});
return createWithProcessors(processors);

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import java.util.function.Consumer;
class WrappingProcessorImpl extends FakeProcessor implements WrappingProcessor {
WrappingProcessorImpl(String type, String tag, Consumer<IngestDocument> executor) {
super(type, tag, executor);
}
@Override
public Processor getInnerProcessor() {
String theType = getType();
String theTag = getTag();
return new Processor() {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return ingestDocument;
}
@Override
public String getType() {
return theType;
}
@Override
public String getTag() {
return theTag;
}
};
}
}