From 01d7020ee37fd71ac9b5e5f1d9e7c912d74415ce Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Fri, 1 Jul 2016 09:15:05 -0700 Subject: [PATCH] Skip the execution of an empty pipeline (#19200) main optimization: `sourceToMap` is not called, therefore avoiding creation of Map of Maps Closes #19192. --- .../ingest/PipelineExecutionService.java | 4 ++++ .../ingest/PipelineExecutionServiceTests.java | 23 +++++++++++++++++-- .../ingest/PipelineFactoryTests.java | 12 ++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index ef726e44843..e47c4db2340 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -143,6 +143,10 @@ public class PipelineExecutionService implements ClusterStateListener { } private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception { + if (pipeline.getProcessors().isEmpty()) { + return; + } + long startTimeInNanos = System.nanoTime(); // the pipeline specific stat holder may not exist and that is fine: // (e.g. the pipeline may have been removed while we're ingesting a document diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 8bf6f77a026..fcc6e04c6c1 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -134,8 +134,25 @@ public class PipelineExecutionServiceTests extends ESTestCase { verify(completionHandler, times(1)).accept(true); } + public void testExecuteEmptyPipeline() throws Exception { + CompoundProcessor processor = mock(CompoundProcessor.class); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor)); + when(processor.getProcessors()).thenReturn(Collections.emptyList()); + + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + @SuppressWarnings("unchecked") + Consumer failureHandler = mock(Consumer.class); + @SuppressWarnings("unchecked") + Consumer completionHandler = mock(Consumer.class); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); + verify(processor, never()).execute(any()); + verify(failureHandler, never()).accept(any()); + verify(completionHandler, times(1)).accept(true); + } + public void testExecutePropagateAllMetaDataUpdates() throws Exception { CompoundProcessor processor = mock(CompoundProcessor.class); + when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); doAnswer((InvocationOnMock invocationOnMock) -> { IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { @@ -171,6 +188,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecuteFailure() throws Exception { CompoundProcessor processor = mock(CompoundProcessor.class); + when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); @@ -313,6 +331,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { } CompoundProcessor processor = mock(CompoundProcessor.class); + when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); Exception error = new RuntimeException(); doThrow(error).when(processor).execute(any()); when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor)); @@ -356,8 +375,8 @@ public class PipelineExecutionServiceTests extends ESTestCase { assertThat(ingestStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); assertThat(ingestStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); - when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, new CompoundProcessor())); - when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, new CompoundProcessor())); + when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, new CompoundProcessor(mock(Processor.class)))); + when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, new CompoundProcessor(mock(Processor.class)))); Map configurationMap = new HashMap<>(); configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"))); diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index 8fe7ddd84cb..ced25419d92 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; @@ -69,6 +70,17 @@ public class PipelineFactoryTests extends ESTestCase { } } + public void testCreateWithEmptyProcessorsField() throws Exception { + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.emptyList()); + Pipeline.Factory factory = new Pipeline.Factory(); + Pipeline pipeline = factory.create("_id", pipelineConfig, null); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getProcessors(), is(empty())); + } + public void testCreateWithPipelineOnFailure() throws Exception { Map processorConfig = new HashMap<>(); Map pipelineConfig = new HashMap<>();