diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 11ffca49fd5..439e0bea094 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -562,26 +562,27 @@ public class IngestService implements ClusterStateApplier { } /** - * Determine if a pipeline contains a processor class within it by introspecting all of the processors within the pipeline. + * Gets all the Processors of the given type from within a Pipeline. * @param pipelineId the pipeline to inspect * @param clazz the Processor class to look for * @return True if the pipeline contains an instance of the Processor class passed in */ - public boolean hasProcessor(String pipelineId, Class clazz) { + public

List

getProcessorsInPipeline(String pipelineId, Class

clazz) { Pipeline pipeline = getPipeline(pipelineId); if (pipeline == null) { - return false; + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } + List

processors = new ArrayList<>(); for (Processor processor: pipeline.flattenAllProcessors()) { if (clazz.isAssignableFrom(processor.getClass())) { - return true; + processors.add(clazz.cast(processor)); } while (processor instanceof WrappingProcessor) { WrappingProcessor wrappingProcessor = (WrappingProcessor) processor; if (clazz.isAssignableFrom(wrappingProcessor.getInnerProcessor().getClass())) { - return true; + processors.add(clazz.cast(wrappingProcessor.getInnerProcessor())); } processor = wrappingProcessor.getInnerProcessor(); // break in the case of self referencing processors in the event a processor author creates a @@ -592,7 +593,7 @@ public class IngestService implements ClusterStateApplier { } } - return false; + return processors; } private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index b9cadba5ec7..e10b1cd154d 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -289,7 +289,7 @@ public class IngestServiceTests extends ESTestCase { ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest); } - public void testHasProcessor() throws Exception { + public void testGetProcessorsInPipeline() throws Exception { IngestService ingestService = createWithProcessors(); String id = "_id"; Pipeline pipeline = ingestService.getPipeline(id); @@ -306,15 +306,19 @@ public class IngestServiceTests extends ESTestCase { pipeline = ingestService.getPipeline(id); assertThat(pipeline, notNullValue()); - assertTrue(ingestService.hasProcessor(id, Processor.class)); - assertTrue(ingestService.hasProcessor(id, WrappingProcessorImpl.class)); - assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class)); - assertTrue(ingestService.hasProcessor(id, FakeProcessor.class)); + assertThat(ingestService.getProcessorsInPipeline(id, Processor.class).size(), equalTo(3)); + assertThat(ingestService.getProcessorsInPipeline(id, WrappingProcessorImpl.class).size(), equalTo(1)); + assertThat(ingestService.getProcessorsInPipeline(id, WrappingProcessor.class).size(), equalTo(1)); + assertThat(ingestService.getProcessorsInPipeline(id, FakeProcessor.class).size(), equalTo(2)); - assertFalse(ingestService.hasProcessor(id, ConditionalProcessor.class)); + assertThat(ingestService.getProcessorsInPipeline(id, ConditionalProcessor.class).size(), equalTo(0)); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> ingestService.getProcessorsInPipeline("fakeID", Processor.class)); + assertThat("pipeline with id [fakeID] does not exist", equalTo(e.getMessage())); } - public void testHasProcessorComplexConditional() throws Exception { + public void testGetProcessorsInPipelineComplexConditional() throws Exception { LongSupplier relativeTimeProvider = mock(LongSupplier.class); String scriptName = "conditionalScript"; ScriptService scriptService = new ScriptService(Settings.builder().build(), @@ -364,12 +368,12 @@ public class IngestServiceTests extends ESTestCase { pipeline = ingestService.getPipeline(id); assertThat(pipeline, notNullValue()); - assertTrue(ingestService.hasProcessor(id, Processor.class)); - assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class)); - assertTrue(ingestService.hasProcessor(id, FakeProcessor.class)); - assertTrue(ingestService.hasProcessor(id, ConditionalProcessor.class)); + assertThat(ingestService.getProcessorsInPipeline(id, Processor.class).size(), equalTo(3)); + assertThat(ingestService.getProcessorsInPipeline(id, WrappingProcessor.class).size(), equalTo(2)); + assertThat(ingestService.getProcessorsInPipeline(id, FakeProcessor.class).size(), equalTo(1)); + assertThat(ingestService.getProcessorsInPipeline(id, ConditionalProcessor.class).size(), equalTo(2)); - assertFalse(ingestService.hasProcessor(id, WrappingProcessorImpl.class)); + assertThat(ingestService.getProcessorsInPipeline(id, WrappingProcessorImpl.class).size(), equalTo(0)); } public void testCrud() throws Exception {