Add "version" field to Pipelines

This adds a version field to Pipelines, which is itself is unused by Elasticsearch, but exists for users to better manage their own pipelines.
This commit is contained in:
Chris Earle 2016-09-06 11:32:43 -04:00
parent bebdec570f
commit 6a7309c09a
8 changed files with 163 additions and 33 deletions

View File

@ -107,15 +107,14 @@ public final class ConfigurationUtils {
value.getClass().getName() + "]");
}
/**
* Returns and removes the specified property from the specified configuration map.
*
* If the property value isn't of type int a {@link ElasticsearchParseException} is thrown.
* If the property is missing an {@link ElasticsearchParseException} is thrown
*/
public static int readIntProperty(String processorType, String processorTag, Map<String, Object> configuration,
String propertyName, int defaultValue) {
public static Integer readIntProperty(String processorType, String processorTag, Map<String, Object> configuration,
String propertyName, Integer defaultValue) {
Object value = configuration.remove(propertyName);
if (value == null) {
return defaultValue;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import java.util.Arrays;
import java.util.Collections;
@ -33,16 +34,21 @@ public final class Pipeline {
static final String DESCRIPTION_KEY = "description";
static final String PROCESSORS_KEY = "processors";
static final String VERSION_KEY = "version";
static final String ON_FAILURE_KEY = "on_failure";
private final String id;
@Nullable
private final String description;
@Nullable
private final Integer version;
private final CompoundProcessor compoundProcessor;
public Pipeline(String id, String description, CompoundProcessor compoundProcessor) {
public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
this.id = id;
this.description = description;
this.compoundProcessor = compoundProcessor;
this.version = version;
}
/**
@ -62,10 +68,21 @@ public final class Pipeline {
/**
* An optional description of what this pipeline is doing to the data gets processed by this pipeline.
*/
@Nullable
public String getDescription() {
return description;
}
/**
* An optional version stored with the pipeline so that it can be used to determine if the pipeline should be updated / replaced.
*
* @return {@code null} if not supplied.
*/
@Nullable
public Integer getVersion() {
return version;
}
/**
* Get the underlying {@link CompoundProcessor} containing the Pipeline's processors
*/
@ -100,6 +117,7 @@ public final class Pipeline {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorFactories) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs =
@ -114,7 +132,7 @@ public final class Pipeline {
}
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors),
Collections.unmodifiableList(onFailureProcessors));
return new Pipeline(id, description, compoundProcessor);
return new Pipeline(id, description, version, compoundProcessor);
}
}

View File

@ -162,7 +162,7 @@ public class IngestActionFilterTests extends ESTestCase {
PipelineStore store = mock(PipelineStore.class);
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value2"));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", randomInt(), new CompoundProcessor(processor)));
executionService = new PipelineExecutionService(store, threadPool);
IngestService ingestService = mock(IngestService.class);
when(ingestService.getPipelineExecutionService()).thenReturn(executionService);

View File

@ -43,6 +43,8 @@ import static org.hamcrest.Matchers.sameInstance;
public class SimulateExecutionServiceTests extends ESTestCase {
private final Integer version = randomBoolean() ? randomInt() : null;
private ThreadPool threadPool;
private SimulateExecutionService executionService;
private IngestDocument ingestDocument;
@ -65,7 +67,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteVerboseItem() throws Exception {
TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
@ -90,7 +92,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
@ -103,7 +105,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> {});
TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2, processor3));
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
@ -127,7 +129,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {});
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description",
Pipeline pipeline = new Pipeline("_id", "_description", version,
new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2)), processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
@ -163,7 +165,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; });
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
@ -179,7 +181,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { });
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
@ -194,7 +196,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteItemWithFailure() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); });
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
assertThat(processor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));

View File

@ -54,7 +54,7 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
public void init() throws IOException {
TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor);
Map<String, Processor.Factory> registry =
Collections.singletonMap("mock_processor", (factories, tag, config) -> processor);
store = mock(PipelineStore.class);

View File

@ -60,6 +60,7 @@ import static org.mockito.Mockito.when;
public class PipelineExecutionServiceTests extends ESTestCase {
private final Integer version = randomBoolean() ? randomInt() : null;
private PipelineStore store;
private PipelineExecutionService executionService;
@ -89,7 +90,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecuteBulkPipelineDoesNotExist() {
CompoundProcessor processor = mock(CompoundProcessor.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@ -122,7 +123,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecuteSuccess() throws Exception {
CompoundProcessor processor = mock(CompoundProcessor.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
@ -136,7 +137,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecuteEmptyPipeline() throws Exception {
CompoundProcessor processor = mock(CompoundProcessor.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
when(processor.getProcessors()).thenReturn(Collections.emptyList());
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@ -165,7 +166,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
}
return null;
}).when(processor).execute(any());
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
@ -189,7 +190,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecuteFailure() throws Exception {
CompoundProcessor processor = mock(CompoundProcessor.class);
when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
@ -209,7 +210,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
Processor onFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
@ -226,7 +227,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
Processor onFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@ -247,7 +248,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(new CompoundProcessor(false, Collections.singletonList(onFailureProcessor),
Collections.singletonList(onFailureOnFailureProcessor))));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@ -264,7 +265,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecuteSetTTL() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("_ttl", "5d"));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, new CompoundProcessor(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
@ -280,7 +281,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecuteSetInvalidTTL() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("_ttl", "abc"));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, new CompoundProcessor(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
@ -293,12 +294,14 @@ public class PipelineExecutionServiceTests extends ESTestCase {
}
public void testExecuteProvidedTTL() throws Exception {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", mock(CompoundProcessor.class)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, mock(CompoundProcessor.class)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id")
.source(Collections.emptyMap())
.ttl(1000L);
@SuppressWarnings("unchecked")
Consumer<Exception> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
@ -334,9 +337,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
Exception error = new RuntimeException();
doThrow(error).when(processor).execute(any());
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor));
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, version, processor));
@SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Exception> requestItemErrorHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
Consumer<Exception> completionHandler = mock(Consumer.class);
executionService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
@ -355,7 +360,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
bulkRequest.add(indexRequest);
}
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, new CompoundProcessor()));
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, version, new CompoundProcessor()));
@SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Exception> requestItemErrorHandler = mock(BiConsumer.class);
@ -375,15 +380,17 @@ public class PipelineExecutionServiceTests extends ESTestCase {
assertThat(ingestStats.getTotalStats().getIngestFailedCount(), equalTo(0L));
assertThat(ingestStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L));
when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, new CompoundProcessor(mock(Processor.class))));
when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, new CompoundProcessor(mock(Processor.class))));
when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, version, new CompoundProcessor(mock(Processor.class))));
when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, null, new CompoundProcessor(mock(Processor.class))));
Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}")));
configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}")));
executionService.updatePipelineStats(new IngestMetadata(configurationMap));
@SuppressWarnings("unchecked")
Consumer<Exception> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
IndexRequest indexRequest = new IndexRequest("_index");

View File

@ -20,8 +20,6 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
@ -34,16 +32,19 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
public class PipelineFactoryTests extends ESTestCase {
private final Integer version = randomBoolean() ? randomInt() : null;
private final String versionString = version != null ? Integer.toString(version) : null;
public void testCreate() throws Exception {
Map<String, Object> processorConfig0 = new HashMap<>();
Map<String, Object> processorConfig1 = new HashMap<>();
processorConfig0.put(ConfigurationUtils.TAG_KEY, "first-processor");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY,
Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1)));
Pipeline.Factory factory = new Pipeline.Factory();
@ -51,6 +52,7 @@ public class PipelineFactoryTests extends ESTestCase {
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
assertThat(pipeline.getProcessors().size(), equalTo(2));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("test-processor"));
assertThat(pipeline.getProcessors().get(0).getTag(), equalTo("first-processor"));
@ -61,6 +63,7 @@ public class PipelineFactoryTests extends ESTestCase {
public void testCreateWithNoProcessorsField() throws Exception {
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
Pipeline.Factory factory = new Pipeline.Factory();
try {
factory.create("_id", pipelineConfig, Collections.emptyMap());
@ -73,11 +76,13 @@ public class PipelineFactoryTests extends ESTestCase {
public void testCreateWithEmptyProcessorsField() throws Exception {
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.emptyList());
Pipeline.Factory factory = new Pipeline.Factory();
Pipeline pipeline = factory.create("_id", pipelineConfig, null);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
assertThat(pipeline.getProcessors(), is(empty()));
}
@ -85,6 +90,7 @@ public class PipelineFactoryTests extends ESTestCase {
Map<String, Object> processorConfig = new HashMap<>();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
@ -92,6 +98,7 @@ public class PipelineFactoryTests extends ESTestCase {
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("test-processor"));
assertThat(pipeline.getOnFailureProcessors().size(), equalTo(1));
@ -102,6 +109,7 @@ public class PipelineFactoryTests extends ESTestCase {
Map<String, Object> processorConfig = new HashMap<>();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList());
Pipeline.Factory factory = new Pipeline.Factory();
@ -115,6 +123,7 @@ public class PipelineFactoryTests extends ESTestCase {
processorConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList());
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
@ -130,12 +139,14 @@ public class PipelineFactoryTests extends ESTestCase {
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY,
Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getOnFailureProcessors().size(), equalTo(0));
@ -149,6 +160,7 @@ public class PipelineFactoryTests extends ESTestCase {
processorConfig.put("unused", "value");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
@ -162,12 +174,14 @@ public class PipelineFactoryTests extends ESTestCase {
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("compound"));
}
@ -177,7 +191,7 @@ public class PipelineFactoryTests extends ESTestCase {
CompoundProcessor processor1 = new CompoundProcessor(testProcessor, testProcessor);
CompoundProcessor processor2 =
new CompoundProcessor(false, Collections.singletonList(testProcessor), Collections.singletonList(testProcessor));
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2));
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
List<Processor> flattened = pipeline.flattenAllProcessors();
assertThat(flattened.size(), equalTo(4));
}

View File

@ -27,6 +27,96 @@
id: "my_pipeline"
---
"Test Put Versioned Pipeline":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"version": 10,
"processors": [ ]
}
- match: { acknowledged: true }
- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 10 }
# Lower version
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"version": 9,
"processors": [ ]
}
- match: { acknowledged: true }
- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 9 }
# Higher version
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"version": 6789,
"processors": [ ]
}
- match: { acknowledged: true }
- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 6789 }
# No version
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [ ]
}
- match: { acknowledged: true }
- do:
ingest.get_pipeline:
id: "my_pipeline"
- is_false: my_pipeline.version
# Coming back with a version
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"version": 5385,
"processors": [ ]
}
- match: { acknowledged: true }
- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 5385 }
# Able to delete the versioned pipeline
- do:
ingest.delete_pipeline:
id: "my_pipeline"
- match: { acknowledged: true }
- do:
catch: missing
ingest.get_pipeline:
id: "my_pipeline"
---
"Test Get All Pipelines":
- do:
ingest.put_pipeline: