mirror of https://github.com/apache/nifi.git
NIFI-9329 - Expose event validation in ParseCEF processor (#5477)
This commit is contained in:
parent
5766d33fce
commit
8fe7f372d6
|
@ -144,6 +144,19 @@ public class ParseCEF extends AbstractProcessor {
|
|||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.required(true)
|
||||
.defaultValue("false")
|
||||
.allowableValues("true", "false")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor VALIDATE_DATA = new PropertyDescriptor.Builder()
|
||||
.name("VALIDATE_DATA")
|
||||
.displayName("Validate the CEF event")
|
||||
.description("If set to true, the event will be validated against the CEF standard (revision 23). If the event is invalid, the "
|
||||
+ "FlowFile will be routed to the failure relationship. If this property is set to false, the event will be processed "
|
||||
+ "without validating the data.")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.required(true)
|
||||
.defaultValue("true")
|
||||
.allowableValues("true", "false")
|
||||
.build();
|
||||
|
||||
public static final String UTC = "UTC";
|
||||
|
@ -187,6 +200,7 @@ public class ParseCEF extends AbstractProcessor {
|
|||
properties.add(FIELDS_DESTINATION);
|
||||
properties.add(APPEND_RAW_MESSAGE_TO_JSON);
|
||||
properties.add(INCLUDE_CUSTOM_EXTENSIONS);
|
||||
properties.add(VALIDATE_DATA);
|
||||
properties.add(TIME_REPRESENTATION);
|
||||
properties.add(DATETIME_REPRESENTATION);
|
||||
return properties;
|
||||
|
@ -247,7 +261,8 @@ public class ParseCEF extends AbstractProcessor {
|
|||
// parcefoneLocale defaults to en_US, so this should not fail. But we force failure in case the custom
|
||||
// validator failed to identify an invalid Locale
|
||||
final Locale parcefoneLocale = Locale.forLanguageTag(context.getProperty(DATETIME_REPRESENTATION).getValue());
|
||||
event = parser.parse(buffer, true, parcefoneLocale);
|
||||
final boolean validateData = context.getProperty(VALIDATE_DATA).asBoolean();
|
||||
event = parser.parse(buffer, validateData, parcefoneLocale);
|
||||
|
||||
} catch (Exception e) {
|
||||
// This should never trigger but adding in here as a fencing mechanism to
|
||||
|
|
|
@ -342,5 +342,34 @@ public class TestParseCEF {
|
|||
Assert.assertEquals(200, extension.get("http_response").asInt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDataValidation() throws Exception {
|
||||
String invalidEvent = sample1 + " proto=ICMP"; // according to the standard, proto can be either tcp or udp.
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new ParseCEF());
|
||||
runner.setProperty(ParseCEF.FIELDS_DESTINATION, ParseCEF.DESTINATION_CONTENT);
|
||||
runner.setProperty(ParseCEF.TIME_REPRESENTATION, ParseCEF.UTC);
|
||||
runner.enqueue(invalidEvent.getBytes());
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ParseCEF.REL_FAILURE, 1);
|
||||
|
||||
runner.clearTransferState();
|
||||
runner.setProperty(ParseCEF.VALIDATE_DATA, "false");
|
||||
runner.enqueue(invalidEvent.getBytes());
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ParseCEF.REL_SUCCESS, 1);
|
||||
|
||||
final MockFlowFile mff = runner.getFlowFilesForRelationship(ParseCEF.REL_SUCCESS).get(0);
|
||||
|
||||
byte [] rawJson = mff.toByteArray();
|
||||
|
||||
JsonNode results = new ObjectMapper().readTree(rawJson);
|
||||
|
||||
JsonNode extension = results.get("extension");
|
||||
Assert.assertEquals("ICMP", extension.get("proto").asText());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue