Retrieve processors instead of checking existence (#45354)

The previous hasProcessors method would validate if a processor was
present within a pipeline, but would not return the contents of the
processors. This does not allow a consumer to inspect the processor for
specific metadata. The method now returns the list of processors based
on the class of the processor passed in.
This commit is contained in:
Michael Basnight 2019-08-12 13:32:25 -05:00
parent 472f6ef41a
commit a521e4c86f
2 changed files with 23 additions and 18 deletions

View File

@ -562,26 +562,27 @@ public class IngestService implements ClusterStateApplier {
} }
/** /**
* Determine if a pipeline contains a processor class within it by introspecting all of the processors within the pipeline. * Gets all the Processors of the given type from within a Pipeline.
* @param pipelineId the pipeline to inspect * @param pipelineId the pipeline to inspect
* @param clazz the Processor class to look for * @param clazz the Processor class to look for
* @return True if the pipeline contains an instance of the Processor class passed in * @return True if the pipeline contains an instance of the Processor class passed in
*/ */
public boolean hasProcessor(String pipelineId, Class<? extends Processor> clazz) { public<P extends Processor> List<P> getProcessorsInPipeline(String pipelineId, Class<P> clazz) {
Pipeline pipeline = getPipeline(pipelineId); Pipeline pipeline = getPipeline(pipelineId);
if (pipeline == null) { if (pipeline == null) {
return false; throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
} }
List<P> processors = new ArrayList<>();
for (Processor processor: pipeline.flattenAllProcessors()) { for (Processor processor: pipeline.flattenAllProcessors()) {
if (clazz.isAssignableFrom(processor.getClass())) { if (clazz.isAssignableFrom(processor.getClass())) {
return true; processors.add(clazz.cast(processor));
} }
while (processor instanceof WrappingProcessor) { while (processor instanceof WrappingProcessor) {
WrappingProcessor wrappingProcessor = (WrappingProcessor) processor; WrappingProcessor wrappingProcessor = (WrappingProcessor) processor;
if (clazz.isAssignableFrom(wrappingProcessor.getInnerProcessor().getClass())) { if (clazz.isAssignableFrom(wrappingProcessor.getInnerProcessor().getClass())) {
return true; processors.add(clazz.cast(wrappingProcessor.getInnerProcessor()));
} }
processor = wrappingProcessor.getInnerProcessor(); processor = wrappingProcessor.getInnerProcessor();
// break in the case of self referencing processors in the event a processor author creates a // break in the case of self referencing processors in the event a processor author creates a
@ -592,7 +593,7 @@ public class IngestService implements ClusterStateApplier {
} }
} }
return false; return processors;
} }
private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {

View File

@ -289,7 +289,7 @@ public class IngestServiceTests extends ESTestCase {
ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest); ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest);
} }
public void testHasProcessor() throws Exception { public void testGetProcessorsInPipeline() throws Exception {
IngestService ingestService = createWithProcessors(); IngestService ingestService = createWithProcessors();
String id = "_id"; String id = "_id";
Pipeline pipeline = ingestService.getPipeline(id); Pipeline pipeline = ingestService.getPipeline(id);
@ -306,15 +306,19 @@ public class IngestServiceTests extends ESTestCase {
pipeline = ingestService.getPipeline(id); pipeline = ingestService.getPipeline(id);
assertThat(pipeline, notNullValue()); assertThat(pipeline, notNullValue());
assertTrue(ingestService.hasProcessor(id, Processor.class)); assertThat(ingestService.getProcessorsInPipeline(id, Processor.class).size(), equalTo(3));
assertTrue(ingestService.hasProcessor(id, WrappingProcessorImpl.class)); assertThat(ingestService.getProcessorsInPipeline(id, WrappingProcessorImpl.class).size(), equalTo(1));
assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class)); assertThat(ingestService.getProcessorsInPipeline(id, WrappingProcessor.class).size(), equalTo(1));
assertTrue(ingestService.hasProcessor(id, FakeProcessor.class)); assertThat(ingestService.getProcessorsInPipeline(id, FakeProcessor.class).size(), equalTo(2));
assertFalse(ingestService.hasProcessor(id, ConditionalProcessor.class)); assertThat(ingestService.getProcessorsInPipeline(id, ConditionalProcessor.class).size(), equalTo(0));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> ingestService.getProcessorsInPipeline("fakeID", Processor.class));
assertThat("pipeline with id [fakeID] does not exist", equalTo(e.getMessage()));
} }
public void testHasProcessorComplexConditional() throws Exception { public void testGetProcessorsInPipelineComplexConditional() throws Exception {
LongSupplier relativeTimeProvider = mock(LongSupplier.class); LongSupplier relativeTimeProvider = mock(LongSupplier.class);
String scriptName = "conditionalScript"; String scriptName = "conditionalScript";
ScriptService scriptService = new ScriptService(Settings.builder().build(), ScriptService scriptService = new ScriptService(Settings.builder().build(),
@ -364,12 +368,12 @@ public class IngestServiceTests extends ESTestCase {
pipeline = ingestService.getPipeline(id); pipeline = ingestService.getPipeline(id);
assertThat(pipeline, notNullValue()); assertThat(pipeline, notNullValue());
assertTrue(ingestService.hasProcessor(id, Processor.class)); assertThat(ingestService.getProcessorsInPipeline(id, Processor.class).size(), equalTo(3));
assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class)); assertThat(ingestService.getProcessorsInPipeline(id, WrappingProcessor.class).size(), equalTo(2));
assertTrue(ingestService.hasProcessor(id, FakeProcessor.class)); assertThat(ingestService.getProcessorsInPipeline(id, FakeProcessor.class).size(), equalTo(1));
assertTrue(ingestService.hasProcessor(id, ConditionalProcessor.class)); assertThat(ingestService.getProcessorsInPipeline(id, ConditionalProcessor.class).size(), equalTo(2));
assertFalse(ingestService.hasProcessor(id, WrappingProcessorImpl.class)); assertThat(ingestService.getProcessorsInPipeline(id, WrappingProcessorImpl.class).size(), equalTo(0));
} }
public void testCrud() throws Exception { public void testCrud() throws Exception {