From 6a7309c09aee155bfb78f921714ae2111c601031 Mon Sep 17 00:00:00 2001 From: Chris Earle Date: Tue, 6 Sep 2016 11:32:43 -0400 Subject: [PATCH] Add "version" field to Pipelines This adds a version field to Pipelines, which is itself is unused by Elasticsearch, but exists for users to better manage their own pipelines. --- .../ingest/ConfigurationUtils.java | 5 +- .../org/elasticsearch/ingest/Pipeline.java | 22 ++++- .../ingest/IngestActionFilterTests.java | 2 +- .../ingest/SimulateExecutionServiceTests.java | 16 ++-- .../SimulatePipelineRequestParsingTests.java | 2 +- .../ingest/PipelineExecutionServiceTests.java | 37 ++++---- .../ingest/PipelineFactoryTests.java | 22 ++++- .../rest-api-spec/test/ingest/10_basic.yaml | 90 +++++++++++++++++++ 8 files changed, 163 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index 3f725c43b25..908e3446980 100644 --- a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java +++ b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -107,15 +107,14 @@ public final class ConfigurationUtils { value.getClass().getName() + "]"); } - /** * Returns and removes the specified property from the specified configuration map. * * If the property value isn't of type int a {@link ElasticsearchParseException} is thrown. * If the property is missing an {@link ElasticsearchParseException} is thrown */ - public static int readIntProperty(String processorType, String processorTag, Map configuration, - String propertyName, int defaultValue) { + public static Integer readIntProperty(String processorType, String processorTag, Map configuration, + String propertyName, Integer defaultValue) { Object value = configuration.remove(propertyName); if (value == null) { return defaultValue; diff --git a/core/src/main/java/org/elasticsearch/ingest/Pipeline.java b/core/src/main/java/org/elasticsearch/ingest/Pipeline.java index e29d206543c..4a705c43bac 100644 --- a/core/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/core/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.Nullable; import java.util.Arrays; import java.util.Collections; @@ -33,16 +34,21 @@ public final class Pipeline { static final String DESCRIPTION_KEY = "description"; static final String PROCESSORS_KEY = "processors"; + static final String VERSION_KEY = "version"; static final String ON_FAILURE_KEY = "on_failure"; private final String id; + @Nullable private final String description; + @Nullable + private final Integer version; private final CompoundProcessor compoundProcessor; - public Pipeline(String id, String description, CompoundProcessor compoundProcessor) { + public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) { this.id = id; this.description = description; this.compoundProcessor = compoundProcessor; + this.version = version; } /** @@ -62,10 +68,21 @@ public final class Pipeline { /** * An optional description of what this pipeline is doing to the data gets processed by this pipeline. */ + @Nullable public String getDescription() { return description; } + /** + * An optional version stored with the pipeline so that it can be used to determine if the pipeline should be updated / replaced. + * + * @return {@code null} if not supplied. + */ + @Nullable + public Integer getVersion() { + return version; + } + /** * Get the underlying {@link CompoundProcessor} containing the Pipeline's processors */ @@ -100,6 +117,7 @@ public final class Pipeline { public Pipeline create(String id, Map config, Map processorFactories) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); + Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories); List>> onFailureProcessorConfigs = @@ -114,7 +132,7 @@ public final class Pipeline { } CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); - return new Pipeline(id, description, compoundProcessor); + return new Pipeline(id, description, version, compoundProcessor); } } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java index b04533fafc4..1316c87e2aa 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java @@ -162,7 +162,7 @@ public class IngestActionFilterTests extends ESTestCase { PipelineStore store = mock(PipelineStore.class); Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value2")); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", randomInt(), new CompoundProcessor(processor))); executionService = new PipelineExecutionService(store, threadPool); IngestService ingestService = mock(IngestService.class); when(ingestService.getPipelineExecutionService()).thenReturn(executionService); diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index 8cf05509813..8e32334a9db 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -43,6 +43,8 @@ import static org.hamcrest.Matchers.sameInstance; public class SimulateExecutionServiceTests extends ESTestCase { + private final Integer version = randomBoolean() ? randomInt() : null; + private ThreadPool threadPool; private SimulateExecutionService executionService; private IngestDocument ingestDocument; @@ -65,7 +67,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { public void testExecuteVerboseItem() throws Exception { TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {}); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); @@ -90,7 +92,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { public void testExecuteItem() throws Exception { TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {}); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); @@ -103,7 +105,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> {}); TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); }); TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2, processor3)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1)); @@ -127,7 +129,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); }); TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {}); TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); - Pipeline pipeline = new Pipeline("_id", "_description", + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2)), processor3)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); @@ -163,7 +165,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { RuntimeException exception = new RuntimeException("processor failed"); TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; }); CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); @@ -179,7 +181,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception { TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { }); CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); @@ -194,7 +196,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { public void testExecuteItemWithFailure() throws Exception { TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); }); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index 8418c886be9..ab5d30c6f9b 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -54,7 +54,7 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { public void init() throws IOException { TestProcessor processor = new TestProcessor(ingestDocument -> {}); CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor); - Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor); + Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor); Map registry = Collections.singletonMap("mock_processor", (factories, tag, config) -> processor); store = mock(PipelineStore.class); diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 53964132abe..acf5c26e565 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -60,6 +60,7 @@ import static org.mockito.Mockito.when; public class PipelineExecutionServiceTests extends ESTestCase { + private final Integer version = randomBoolean() ? randomInt() : null; private PipelineStore store; private PipelineExecutionService executionService; @@ -89,7 +90,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecuteBulkPipelineDoesNotExist() { CompoundProcessor processor = mock(CompoundProcessor.class); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor)); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); BulkRequest bulkRequest = new BulkRequest(); IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); @@ -122,7 +123,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecuteSuccess() throws Exception { CompoundProcessor processor = mock(CompoundProcessor.class); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor)); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") @@ -136,7 +137,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecuteEmptyPipeline() throws Exception { CompoundProcessor processor = mock(CompoundProcessor.class); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor)); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); when(processor.getProcessors()).thenReturn(Collections.emptyList()); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); @@ -165,7 +166,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { } return null; }).when(processor).execute(any()); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor)); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") @@ -189,7 +190,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecuteFailure() throws Exception { CompoundProcessor processor = mock(CompoundProcessor.class); when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor)); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); @SuppressWarnings("unchecked") @@ -209,7 +210,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { Processor onFailureProcessor = mock(Processor.class); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); @SuppressWarnings("unchecked") @@ -226,7 +227,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { Processor onFailureProcessor = mock(Processor.class); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); @@ -247,7 +248,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(false, Collections.singletonList(onFailureProcessor), Collections.singletonList(onFailureOnFailureProcessor)))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); @@ -264,7 +265,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecuteSetTTL() throws Exception { Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("_ttl", "5d")); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, new CompoundProcessor(processor))); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") @@ -280,7 +281,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecuteSetInvalidTTL() throws Exception { Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("_ttl", "abc")); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, new CompoundProcessor(processor))); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") @@ -293,12 +294,14 @@ public class PipelineExecutionServiceTests extends ESTestCase { } public void testExecuteProvidedTTL() throws Exception { - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", mock(CompoundProcessor.class))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, mock(CompoundProcessor.class))); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id") .source(Collections.emptyMap()) .ttl(1000L); + @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); + @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); @@ -334,9 +337,11 @@ public class PipelineExecutionServiceTests extends ESTestCase { when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); Exception error = new RuntimeException(); doThrow(error).when(processor).execute(any()); - when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor)); + when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, version, processor)); + @SuppressWarnings("unchecked") BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); executionService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); @@ -355,7 +360,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { bulkRequest.add(indexRequest); } - when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, new CompoundProcessor())); + when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, version, new CompoundProcessor())); @SuppressWarnings("unchecked") BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @@ -375,15 +380,17 @@ public class PipelineExecutionServiceTests extends ESTestCase { assertThat(ingestStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); assertThat(ingestStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); - when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, new CompoundProcessor(mock(Processor.class)))); - when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, new CompoundProcessor(mock(Processor.class)))); + when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, version, new CompoundProcessor(mock(Processor.class)))); + when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, null, new CompoundProcessor(mock(Processor.class)))); Map configurationMap = new HashMap<>(); configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"))); configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"))); executionService.updatePipelineStats(new IngestMetadata(configurationMap)); + @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); + @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); IndexRequest indexRequest = new IndexRequest("_index"); diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index b09d772729c..461873a3fe3 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -20,8 +20,6 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; @@ -34,16 +32,19 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Mockito.mock; public class PipelineFactoryTests extends ESTestCase { + private final Integer version = randomBoolean() ? randomInt() : null; + private final String versionString = version != null ? Integer.toString(version) : null; + public void testCreate() throws Exception { Map processorConfig0 = new HashMap<>(); Map processorConfig1 = new HashMap<>(); processorConfig0.put(ConfigurationUtils.TAG_KEY, "first-processor"); Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1))); Pipeline.Factory factory = new Pipeline.Factory(); @@ -51,6 +52,7 @@ public class PipelineFactoryTests extends ESTestCase { Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getVersion(), equalTo(version)); assertThat(pipeline.getProcessors().size(), equalTo(2)); assertThat(pipeline.getProcessors().get(0).getType(), equalTo("test-processor")); assertThat(pipeline.getProcessors().get(0).getTag(), equalTo("first-processor")); @@ -61,6 +63,7 @@ public class PipelineFactoryTests extends ESTestCase { public void testCreateWithNoProcessorsField() throws Exception { Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); Pipeline.Factory factory = new Pipeline.Factory(); try { factory.create("_id", pipelineConfig, Collections.emptyMap()); @@ -73,11 +76,13 @@ public class PipelineFactoryTests extends ESTestCase { public void testCreateWithEmptyProcessorsField() throws Exception { Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.emptyList()); Pipeline.Factory factory = new Pipeline.Factory(); Pipeline pipeline = factory.create("_id", pipelineConfig, null); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getVersion(), equalTo(version)); assertThat(pipeline.getProcessors(), is(empty())); } @@ -85,6 +90,7 @@ public class PipelineFactoryTests extends ESTestCase { Map processorConfig = new HashMap<>(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); @@ -92,6 +98,7 @@ public class PipelineFactoryTests extends ESTestCase { Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getVersion(), equalTo(version)); assertThat(pipeline.getProcessors().size(), equalTo(1)); assertThat(pipeline.getProcessors().get(0).getType(), equalTo("test-processor")); assertThat(pipeline.getOnFailureProcessors().size(), equalTo(1)); @@ -102,6 +109,7 @@ public class PipelineFactoryTests extends ESTestCase { Map processorConfig = new HashMap<>(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList()); Pipeline.Factory factory = new Pipeline.Factory(); @@ -115,6 +123,7 @@ public class PipelineFactoryTests extends ESTestCase { processorConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList()); Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); @@ -130,12 +139,14 @@ public class PipelineFactoryTests extends ESTestCase { Pipeline.Factory factory = new Pipeline.Factory(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getVersion(), equalTo(version)); assertThat(pipeline.getProcessors().size(), equalTo(1)); assertThat(pipeline.getOnFailureProcessors().size(), equalTo(0)); @@ -149,6 +160,7 @@ public class PipelineFactoryTests extends ESTestCase { processorConfig.put("unused", "value"); Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); @@ -162,12 +174,14 @@ public class PipelineFactoryTests extends ESTestCase { Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getVersion(), equalTo(version)); assertThat(pipeline.getProcessors().size(), equalTo(1)); assertThat(pipeline.getProcessors().get(0).getType(), equalTo("compound")); } @@ -177,7 +191,7 @@ public class PipelineFactoryTests extends ESTestCase { CompoundProcessor processor1 = new CompoundProcessor(testProcessor, testProcessor); CompoundProcessor processor2 = new CompoundProcessor(false, Collections.singletonList(testProcessor), Collections.singletonList(testProcessor)); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2)); List flattened = pipeline.flattenAllProcessors(); assertThat(flattened.size(), equalTo(4)); } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_basic.yaml index 6e822bf1da0..e5b9b0fd5eb 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_basic.yaml @@ -27,6 +27,96 @@ id: "my_pipeline" --- +"Test Put Versioned Pipeline": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "version": 10, + "processors": [ ] + } + - match: { acknowledged: true } + + - do: + ingest.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.version: 10 } + + # Lower version + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "version": 9, + "processors": [ ] + } + - match: { acknowledged: true } + + - do: + ingest.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.version: 9 } + + # Higher version + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "version": 6789, + "processors": [ ] + } + - match: { acknowledged: true } + + - do: + ingest.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.version: 6789 } + + # No version + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [ ] + } + - match: { acknowledged: true } + + - do: + ingest.get_pipeline: + id: "my_pipeline" + - is_false: my_pipeline.version + + # Coming back with a version + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "version": 5385, + "processors": [ ] + } + - match: { acknowledged: true } + + - do: + ingest.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.version: 5385 } + + # Able to delete the versioned pipeline + - do: + ingest.delete_pipeline: + id: "my_pipeline" + - match: { acknowledged: true } + + - do: + catch: missing + ingest.get_pipeline: + id: "my_pipeline" +--- "Test Get All Pipelines": - do: ingest.put_pipeline: