[INGEST] Added processor type and tag header to error when processor type isn't available on node.

[INGEST] Accumulate any potential other processor parse errors before failing instead of failing upon first processor parsing error.
This commit is contained in:
Martijn van Groningen 2017-02-27 09:51:50 +01:00
parent a54daade33
commit 1d3f6c463c
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
5 changed files with 57 additions and 33 deletions

View File

@ -244,15 +244,24 @@ public final class ConfigurationUtils {
public static List<Processor> readProcessorConfigs(List<Map<String, Map<String, Object>>> processorConfigs,
Map<String, Processor.Factory> processorFactories) throws Exception {
Exception exception = null;
List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processorConfigWithKey.entrySet()) {
processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue()));
try {
processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue()));
} catch (Exception e) {
exception = ExceptionsHelper.useOrSuppress(exception, e);
}
}
}
}
if (exception != null) {
throw exception;
}
return processors;
}

View File

@ -164,12 +164,12 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorFactories);
List<IllegalArgumentException> exceptions = new ArrayList<>();
List<Exception> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
if (entry.getValue().containsProcessor(processor.getType()) == false) {
String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]";
exceptions.add(new IllegalArgumentException(message));
exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message));
}
}
}

View File

@ -19,6 +19,10 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -26,11 +30,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
@ -110,15 +111,33 @@ public class ConfigurationUtilsTests extends ESTestCase {
Map<String, Object> unknownTaggedConfig = new HashMap<>();
unknownTaggedConfig.put("tag", "my_unknown");
config.add(Collections.singletonMap("unknown_processor", unknownTaggedConfig));
try {
ConfigurationUtils.readProcessorConfigs(config, registry);
fail("exception expected");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]"));
assertThat(e.getHeader("processor_tag"), equalTo(Collections.singletonList("my_unknown")));
assertThat(e.getHeader("processor_type"), equalTo(Collections.singletonList("unknown_processor")));
assertThat(e.getHeader("property_name"), is(nullValue()));
}
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,
() -> ConfigurationUtils.readProcessorConfigs(config, registry));
assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]"));
assertThat(e.getHeader("processor_tag"), equalTo(Collections.singletonList("my_unknown")));
assertThat(e.getHeader("processor_type"), equalTo(Collections.singletonList("unknown_processor")));
assertThat(e.getHeader("property_name"), is(nullValue()));
List<Map<String, Map<String, Object>>> config2 = new ArrayList<>();
unknownTaggedConfig = new HashMap<>();
unknownTaggedConfig.put("tag", "my_unknown");
config2.add(Collections.singletonMap("unknown_processor", unknownTaggedConfig));
Map<String, Object> secondUnknonwTaggedConfig = new HashMap<>();
secondUnknonwTaggedConfig.put("tag", "my_second_unknown");
config2.add(Collections.singletonMap("second_unknown_processor", secondUnknonwTaggedConfig));
e = expectThrows(ElasticsearchParseException.class, () -> ConfigurationUtils.readProcessorConfigs(config2, registry));
assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]"));
assertThat(e.getHeader("processor_tag"), equalTo(Collections.singletonList("my_unknown")));
assertThat(e.getHeader("processor_type"), equalTo(Collections.singletonList("unknown_processor")));
assertThat(e.getHeader("property_name"), is(nullValue()));
assertThat(e.getSuppressed().length, equalTo(1));
assertThat(e.getSuppressed()[0], instanceOf(ElasticsearchParseException.class));
ElasticsearchParseException e2 = (ElasticsearchParseException) e.getSuppressed()[0];
assertThat(e2.getMessage(), equalTo("No processor type exists with name [second_unknown_processor]"));
assertThat(e2.getHeader("processor_tag"), equalTo(Collections.singletonList("my_second_unknown")));
assertThat(e2.getHeader("processor_type"), equalTo(Collections.singletonList("second_unknown_processor")));
assertThat(e2.getHeader("property_name"), is(nullValue()));
}
}

View File

@ -22,8 +22,8 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@ -72,7 +72,7 @@ public class IngestProcessorNotInstalledOnAllNodesIT extends ESIntegTestCase {
try {
client().admin().cluster().preparePutPipeline("_id", pipelineSource, XContentType.JSON).get();
fail("exception expected");
} catch (IllegalArgumentException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("Processor type [test] is not installed on node"));
}
}

View File

@ -70,7 +70,7 @@ public class PipelineStoreTests extends ESTestCase {
@Override
public String getTag() {
return null;
return tag;
}
};
});
@ -89,7 +89,7 @@ public class PipelineStoreTests extends ESTestCase {
@Override
public String getTag() {
return null;
return tag;
}
};
});
@ -335,7 +335,8 @@ public class PipelineStoreTests extends ESTestCase {
public void testValidate() throws Exception {
PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(
"{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}},{\"remove\" : {\"field\": \"_field\"}}]}"),
"{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," +
"{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"),
XContentType.JSON);
DiscoveryNode node1 = new DiscoveryNode("_node_id1", buildNewFakeTransportAddress(),
@ -346,12 +347,11 @@ public class PipelineStoreTests extends ESTestCase {
ingestInfos.put(node1, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove"))));
ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set"))));
try {
store.validatePipeline(ingestInfos, putRequest);
fail("exception expected");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Processor type [remove] is not installed on node [" + node2 + "]"));
}
ElasticsearchParseException e =
expectThrows(ElasticsearchParseException.class, () -> store.validatePipeline(ingestInfos, putRequest));
assertEquals("Processor type [remove] is not installed on node [" + node2 + "]", e.getMessage());
assertEquals("remove", e.getHeader("processor_type").get(0));
assertEquals("tag2", e.getHeader("processor_tag").get(0));
ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove"))));
store.validatePipeline(ingestInfos, putRequest);
@ -360,12 +360,8 @@ public class PipelineStoreTests extends ESTestCase {
public void testValidateNoIngestInfo() throws Exception {
PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(
"{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON);
try {
store.validatePipeline(Collections.emptyMap(), putRequest);
fail("exception expected");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), equalTo("Ingest info is empty"));
}
Exception e = expectThrows(IllegalStateException.class, () -> store.validatePipeline(Collections.emptyMap(), putRequest));
assertEquals("Ingest info is empty", e.getMessage());
DiscoveryNode discoveryNode = new DiscoveryNode("_node_id", buildNewFakeTransportAddress(),
emptyMap(), emptySet(), Version.CURRENT);