NIFI-9419 ParseCEF - Upgraded parcefone and supported empty extensions

- Upgraded com.fluenda:parcefone from 2.0.0 to 2.1.0
- Added Accept empty extensions property to ParseCEF

This closes #5555

Co-authored-by: David Handermann <exceptionfactory@apache.org>
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2021-11-29 18:45:54 +01:00 committed by exceptionfactory
parent 6a6b755aaa
commit e6b573e79e
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 49 additions and 5 deletions

View File

@ -147,6 +147,16 @@ public class ParseCEF extends AbstractProcessor {
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor ACCEPT_EMPTY_EXTENSIONS = new PropertyDescriptor.Builder()
.name("ACCEPT_EMPTY_EXTENSIONS")
.displayName("Accept empty extensions")
.description("If set to true, empty extensions will be accepted and will be associated to a null value.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.required(true)
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor VALIDATE_DATA = new PropertyDescriptor.Builder()
.name("VALIDATE_DATA")
.displayName("Validate the CEF event")
@ -200,6 +210,7 @@ public class ParseCEF extends AbstractProcessor {
properties.add(FIELDS_DESTINATION);
properties.add(APPEND_RAW_MESSAGE_TO_JSON);
properties.add(INCLUDE_CUSTOM_EXTENSIONS);
properties.add(ACCEPT_EMPTY_EXTENSIONS);
properties.add(VALIDATE_DATA);
properties.add(TIME_REPRESENTATION);
properties.add(DATETIME_REPRESENTATION);
@ -262,12 +273,13 @@ public class ParseCEF extends AbstractProcessor {
// validator failed to identify an invalid Locale
final Locale parcefoneLocale = Locale.forLanguageTag(context.getProperty(DATETIME_REPRESENTATION).getValue());
final boolean validateData = context.getProperty(VALIDATE_DATA).asBoolean();
event = parser.parse(buffer, validateData, parcefoneLocale);
final boolean acceptEmptyExtensions = context.getProperty(ACCEPT_EMPTY_EXTENSIONS).asBoolean();
event = parser.parse(buffer, validateData, acceptEmptyExtensions, parcefoneLocale);
} catch (Exception e) {
// This should never trigger but adding in here as a fencing mechanism to
// address possible ParCEFone bugs.
getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] {e, flowFile});
getLogger().error("CEF Parsing Failed: {}", flowFile, e);
session.transfer(flowFile, REL_FAILURE);
return;
}
@ -339,7 +351,7 @@ public class ParseCEF extends AbstractProcessor {
session.transfer(flowFile, REL_SUCCESS);
} catch (CEFHandlingException e) {
// The flowfile has failed parsing & validation, routing to failure and committing
getLogger().error("Failed to parse {} as a CEF message due to {}; routing to failure", new Object[] {flowFile, e});
getLogger().error("Reading CEF Event Failed: {}", flowFile, e);
// Create a provenance event recording the routing to failure
session.getProvenanceReporter().route(flowFile, REL_FAILURE);
session.transfer(flowFile, REL_FAILURE);
@ -379,6 +391,7 @@ public class ParseCEF extends AbstractProcessor {
return new ValidationResult.Builder().subject(subject).input(input).valid(false)
.explanation(subject + " cannot be empty").build();
}
final Locale testLocale = Locale.forLanguageTag(input);
final Locale[] availableLocales = Locale.getAvailableLocales();
@ -389,7 +402,6 @@ public class ParseCEF extends AbstractProcessor {
.explanation(input + " is not a valid locale format.").build();
} else {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -342,6 +343,37 @@ public class TestParseCEF {
Assert.assertEquals(200, extension.get("http_response").asInt());
}
@Test
public void testAcceptEmptyExtensions() throws Exception {
String sample3 = "CEF:0|TestVendor|TestProduct|TestVersion|TestEventClassID|TestName|Low|" +
"rt=Feb 09 2015 00:27:43 UTC cn3Label=Test Long cn3= " +
"cfp1=1.234 cfp1Label=Test FP Number smac=00:00:0c:07:ac:00 " +
"c6a3=2001:cdba::3257:9652 c6a3Label=Test IPv6 cs1Label=Test String cs1=test test test chocolate " +
"destinationTranslatedAddress=123.123.123.123 " +
"deviceCustomDate1=Feb 06 2015 13:27:43 " +
"dpt= agt= dlat=";
final TestRunner runner = TestRunners.newTestRunner(new ParseCEF());
runner.setProperty(ParseCEF.FIELDS_DESTINATION, ParseCEF.DESTINATION_CONTENT);
runner.setProperty(ParseCEF.TIME_REPRESENTATION, ParseCEF.UTC);
runner.setProperty(ParseCEF.INCLUDE_CUSTOM_EXTENSIONS, "true");
runner.setProperty(ParseCEF.ACCEPT_EMPTY_EXTENSIONS, "true");
runner.setProperty(ParseCEF.VALIDATE_DATA, "false");
runner.enqueue(sample3.getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(ParseCEF.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(ParseCEF.REL_SUCCESS).get(0);
byte [] rawJson = mff.toByteArray();
JsonNode results = new ObjectMapper().readTree(rawJson);
JsonNode extensions = results.get("extension");
Assert.assertTrue(extensions.has("cn3"));
Assert.assertTrue(extensions.get("cn3").isNull());
}
@Test
public void testDataValidation() throws Exception {
String invalidEvent = sample1 + " proto=ICMP"; // according to the standard, proto can be either tcp or udp.

View File

@ -316,7 +316,7 @@
<dependency>
<groupId>com.fluenda</groupId>
<artifactId>parcefone</artifactId>
<version>2.0.0</version>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.github.wnameless.json</groupId>