Introduce a fail processor

This commit is contained in:
Tal Levy 2015-12-23 16:20:11 -08:00
parent d14f6dbd3b
commit 9dd54f2b4f
8 changed files with 293 additions and 2 deletions

View File

@ -445,6 +445,20 @@ An example that adds the parsed date to the `timestamp` field based on the `init
} }
-------------------------------------------------- --------------------------------------------------
==== Fail processor
The Fail Processor is used to raise an exception. This is useful for when
a user expects a pipeline to fail and wishes to relay a specific message
to the requester.
[source,js]
--------------------------------------------------
{
"fail": {
"message": "an error message"
}
}
--------------------------------------------------
=== Accessing data in pipelines === Accessing data in pipelines
Processors in pipelines have read and write access to documents that pass through the pipeline. Processors in pipelines have read and write access to documents that pass through the pipeline.

View File

@ -165,8 +165,7 @@ public final class IngestDocument {
* @throws IllegalArgumentException if the path is null, empty, invalid or if the field doesn't exist. * @throws IllegalArgumentException if the path is null, empty, invalid or if the field doesn't exist.
*/ */
public void removeField(TemplateService.Template fieldPathTemplate) { public void removeField(TemplateService.Template fieldPathTemplate) {
Map<String, Object> model = createTemplateModel(); removeField(renderTemplate(fieldPathTemplate));
removeField(fieldPathTemplate.execute(model));
} }
/** /**
@ -422,6 +421,10 @@ public final class IngestDocument {
throw new IllegalArgumentException("field [" + path + "] of type [" + object.getClass().getName() + "] cannot be cast to [" + clazz.getName() + "]"); throw new IllegalArgumentException("field [" + path + "] of type [" + object.getClass().getName() + "] cannot be cast to [" + clazz.getName() + "]");
} }
public String renderTemplate(TemplateService.Template template) {
return template.execute(createTemplateModel());
}
private Map<String, Object> createTemplateModel() { private Map<String, Object> createTemplateModel() {
Map<String, Object> model = new HashMap<>(sourceAndMetadata); Map<String, Object> model = new HashMap<>(sourceAndMetadata);
model.put(SOURCE_KEY, sourceAndMetadata); model.put(SOURCE_KEY, sourceAndMetadata);

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.fail;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.util.Map;
/**
* Processor that raises a runtime exception with a provided
* error message.
*/
public class FailProcessor implements Processor {
public static final String TYPE = "fail";
private final TemplateService.Template message;
FailProcessor(TemplateService.Template message) {
this.message = message;
}
public TemplateService.Template getMessage() {
return message;
}
@Override
public void execute(IngestDocument document) {
throw new FailProcessorException(document.renderTemplate(message));
}
@Override
public String getType() {
return TYPE;
}
public static class Factory implements Processor.Factory<FailProcessor> {
private final TemplateService templateService;
public Factory(TemplateService templateService) {
this.templateService = templateService;
}
@Override
public FailProcessor create(Map<String, Object> config) throws Exception {
String message = ConfigurationUtils.readStringProperty(config, "message");
return new FailProcessor(templateService.compile(message));
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.fail;
/**
* Exception class thrown by {@link FailProcessor}.
*/
public class FailProcessorException extends RuntimeException {
public FailProcessorException(String message) {
super(message);
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.ingest.processor.append.AppendProcessor; import org.elasticsearch.ingest.processor.append.AppendProcessor;
import org.elasticsearch.ingest.processor.convert.ConvertProcessor; import org.elasticsearch.ingest.processor.convert.ConvertProcessor;
import org.elasticsearch.ingest.processor.date.DateProcessor; import org.elasticsearch.ingest.processor.date.DateProcessor;
import org.elasticsearch.ingest.processor.fail.FailProcessor;
import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor; import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor;
import org.elasticsearch.ingest.processor.grok.GrokProcessor; import org.elasticsearch.ingest.processor.grok.GrokProcessor;
import org.elasticsearch.ingest.processor.gsub.GsubProcessor; import org.elasticsearch.ingest.processor.gsub.GsubProcessor;
@ -72,6 +73,7 @@ public class IngestModule extends AbstractModule {
addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
MapBinder<String, ProcessorFactoryProvider> mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class); MapBinder<String, ProcessorFactoryProvider> mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class);
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) { for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.fail;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class FailProcessorFactoryTests extends ESTestCase {
private FailProcessor.Factory factory;
@Before
public void init() {
factory = new FailProcessor.Factory(TestTemplateService.instance());
}
public void testCreate() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("message", "error");
FailProcessor failProcessor = factory.create(config);
assertThat(failProcessor.getMessage().execute(Collections.emptyMap()), equalTo("error"));
}
public void testCreateMissingMessageField() throws Exception {
Map<String, Object> config = new HashMap<>();
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [message] is missing"));
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.fail;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
public class FailProcessorTests extends ESTestCase {
public void test() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String message = randomAsciiOfLength(10);
Processor processor = new FailProcessor(new TestTemplateService.MockTemplate(message));
try {
processor.execute(ingestDocument);
fail("fail processor should throw an exception");
} catch (FailProcessorException e) {
assertThat(e.getMessage(), equalTo(message));
}
}
}

View File

@ -0,0 +1,68 @@
---
"Test Fail Processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"fail" : {
"message" : "error_message"
}
}
]
}
- match: { _id: "my_pipeline" }
- do:
catch: request
ingest.index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {}
---
"Test fail with on_failure":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"fail" : {
"message" : "error",
"on_failure" : [
{
"set" : {
"field" : "error_message",
"value" : "fail_processor_ran"
}
}
]
}
}
]
}
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {}
- do:
get:
index: test
type: test
id: 1
- match: { _source.error_message: "fail_processor_ran" }