Skip the execution of an empty pipeline (#19200)
main optimization: `sourceToMap` is not called, therefore avoiding creation of Map of Maps Closes #19192.
This commit is contained in:
parent
b66fc308fc
commit
01d7020ee3
|
@ -143,6 +143,10 @@ public class PipelineExecutionService implements ClusterStateListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception {
|
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception {
|
||||||
|
if (pipeline.getProcessors().isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
long startTimeInNanos = System.nanoTime();
|
long startTimeInNanos = System.nanoTime();
|
||||||
// the pipeline specific stat holder may not exist and that is fine:
|
// the pipeline specific stat holder may not exist and that is fine:
|
||||||
// (e.g. the pipeline may have been removed while we're ingesting a document
|
// (e.g. the pipeline may have been removed while we're ingesting a document
|
||||||
|
|
|
@ -134,8 +134,25 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
||||||
verify(completionHandler, times(1)).accept(true);
|
verify(completionHandler, times(1)).accept(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testExecuteEmptyPipeline() throws Exception {
|
||||||
|
CompoundProcessor processor = mock(CompoundProcessor.class);
|
||||||
|
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
|
||||||
|
when(processor.getProcessors()).thenReturn(Collections.emptyList());
|
||||||
|
|
||||||
|
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Consumer<Throwable> failureHandler = mock(Consumer.class);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||||
|
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
|
||||||
|
verify(processor, never()).execute(any());
|
||||||
|
verify(failureHandler, never()).accept(any());
|
||||||
|
verify(completionHandler, times(1)).accept(true);
|
||||||
|
}
|
||||||
|
|
||||||
public void testExecutePropagateAllMetaDataUpdates() throws Exception {
|
public void testExecutePropagateAllMetaDataUpdates() throws Exception {
|
||||||
CompoundProcessor processor = mock(CompoundProcessor.class);
|
CompoundProcessor processor = mock(CompoundProcessor.class);
|
||||||
|
when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
|
||||||
doAnswer((InvocationOnMock invocationOnMock) -> {
|
doAnswer((InvocationOnMock invocationOnMock) -> {
|
||||||
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
|
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
|
||||||
for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) {
|
for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) {
|
||||||
|
@ -171,6 +188,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
||||||
|
|
||||||
public void testExecuteFailure() throws Exception {
|
public void testExecuteFailure() throws Exception {
|
||||||
CompoundProcessor processor = mock(CompoundProcessor.class);
|
CompoundProcessor processor = mock(CompoundProcessor.class);
|
||||||
|
when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
|
||||||
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
|
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
|
||||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
|
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
|
||||||
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
|
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
|
||||||
|
@ -313,6 +331,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
CompoundProcessor processor = mock(CompoundProcessor.class);
|
CompoundProcessor processor = mock(CompoundProcessor.class);
|
||||||
|
when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
|
||||||
Exception error = new RuntimeException();
|
Exception error = new RuntimeException();
|
||||||
doThrow(error).when(processor).execute(any());
|
doThrow(error).when(processor).execute(any());
|
||||||
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor));
|
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor));
|
||||||
|
@ -356,8 +375,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
||||||
assertThat(ingestStats.getTotalStats().getIngestFailedCount(), equalTo(0L));
|
assertThat(ingestStats.getTotalStats().getIngestFailedCount(), equalTo(0L));
|
||||||
assertThat(ingestStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L));
|
assertThat(ingestStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L));
|
||||||
|
|
||||||
when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, new CompoundProcessor()));
|
when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, new CompoundProcessor(mock(Processor.class))));
|
||||||
when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, new CompoundProcessor()));
|
when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, new CompoundProcessor(mock(Processor.class))));
|
||||||
|
|
||||||
Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
|
Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
|
||||||
configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}")));
|
configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}")));
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -69,6 +70,17 @@ public class PipelineFactoryTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCreateWithEmptyProcessorsField() throws Exception {
|
||||||
|
Map<String, Object> pipelineConfig = new HashMap<>();
|
||||||
|
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
|
||||||
|
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.emptyList());
|
||||||
|
Pipeline.Factory factory = new Pipeline.Factory();
|
||||||
|
Pipeline pipeline = factory.create("_id", pipelineConfig, null);
|
||||||
|
assertThat(pipeline.getId(), equalTo("_id"));
|
||||||
|
assertThat(pipeline.getDescription(), equalTo("_description"));
|
||||||
|
assertThat(pipeline.getProcessors(), is(empty()));
|
||||||
|
}
|
||||||
|
|
||||||
public void testCreateWithPipelineOnFailure() throws Exception {
|
public void testCreateWithPipelineOnFailure() throws Exception {
|
||||||
Map<String, Object> processorConfig = new HashMap<>();
|
Map<String, Object> processorConfig = new HashMap<>();
|
||||||
Map<String, Object> pipelineConfig = new HashMap<>();
|
Map<String, Object> pipelineConfig = new HashMap<>();
|
||||||
|
|
Loading…
Reference in New Issue