NIFI-4735: ParseEVTX only outputs one event per chunk

This change is based on https://github.com/apache/nifi/pull/2489

I have reproduced the issue with some additional test cases and test files then applied the original fix.

commit message from the original change:
Updated the EVTX FileHeader class to correctly check if there are more chunks in the file. Previously this would not process the last chunk.

Updated the EVTX ChunkHeader class to correctly check if there are additional records in the chunk. Previously this would only process the first record of each chunk. It was using the fileLastRecordNumber where it should have been using the logLastRecordNumber value.

Updated the EVTX unit tests to have the correct expected number of events and use the logLastRecordNumber.

refactoring duplicated code and magic numbers

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2489
This closes #3379
This commit is contained in:
Ferenc Szabó 2019-03-20 12:28:08 +01:00 committed by Matthew Burgess
parent d35d15cdda
commit 48a6c81fa2
6 changed files with 43 additions and 10 deletions

View File

@ -158,7 +158,7 @@ public class ChunkHeader extends Block {
}
public boolean hasNext() {
return fileLastRecordNumber.compareTo(recordNumber) > 0;
return logLastRecordNumber.compareTo(recordNumber) > 0;
}
public String getString(int offset) {

View File

@ -141,10 +141,10 @@ public class FileHeader extends Block {
/**
* Tests whether there are more chunks
* @return true iff there are chunks left
* @return true if there are chunks left
*/
public boolean hasNext() {
return count < chunkCount;
return count <= chunkCount;
}
/**

View File

@ -74,6 +74,7 @@ public class ParseEvtxTest {
public static final String USER_DATA = "UserData";
public static final String EVENT_DATA = "EventData";
public static final Set DATA_TAGS = new HashSet<>(Arrays.asList(EVENT_DATA, USER_DATA));
public static final int EXPECTED_SUCCESSFUL_EVENT_COUNT = 1053;
@Mock
FileHeaderFactory fileHeaderFactory;
@ -366,7 +367,7 @@ public class ParseEvtxTest {
assertEquals(1, failureFlowFiles.size());
validateFlowFiles(failureFlowFiles);
// We expect the same number of records to come out no matter the granularity
assertEquals(960, validateFlowFiles(failureFlowFiles));
assertEquals(EXPECTED_SUCCESSFUL_EVENT_COUNT, validateFlowFiles(failureFlowFiles));
// Whole file fails if there is a failure parsing
List<MockFlowFile> successFlowFiles = testRunner.getFlowFilesForRelationship(ParseEvtx.REL_SUCCESS);
@ -399,10 +400,10 @@ public class ParseEvtxTest {
assertEquals(1, failureFlowFiles.size());
List<MockFlowFile> successFlowFiles = testRunner.getFlowFilesForRelationship(ParseEvtx.REL_SUCCESS);
assertEquals(8, successFlowFiles.size());
assertEquals(9, successFlowFiles.size());
// We expect the same number of records to come out no matter the granularity
assertEquals(960, validateFlowFiles(successFlowFiles) + validateFlowFiles(failureFlowFiles));
assertEquals(EXPECTED_SUCCESSFUL_EVENT_COUNT, validateFlowFiles(successFlowFiles) + validateFlowFiles(failureFlowFiles));
}
@Test
@ -433,10 +434,42 @@ public class ParseEvtxTest {
// Whole file fails if there is a failure parsing
List<MockFlowFile> successFlowFiles = testRunner.getFlowFilesForRelationship(ParseEvtx.REL_SUCCESS);
assertEquals(960, successFlowFiles.size());
assertEquals(EXPECTED_SUCCESSFUL_EVENT_COUNT, successFlowFiles.size());
// We expect the same number of records to come out no matter the granularity
assertEquals(960, validateFlowFiles(successFlowFiles));
assertEquals(EXPECTED_SUCCESSFUL_EVENT_COUNT, validateFlowFiles(successFlowFiles));
}
@Test
public void testRecordBasedParseCorrectNumberOfFlowFiles() {
testValidEvents(ParseEvtx.RECORD, "1344_events.evtx", 1344);
}
@Test
public void testChunkBasedParseCorrectNumberOfFlowFiles() {
testValidEvents(ParseEvtx.CHUNK, "1344_events.evtx", 14);
}
@Test
public void testRecordBasedParseCorrectNumberOfFlowFilesFromAResizedFile() {
testValidEvents(ParseEvtx.RECORD, "3778_events_not_exported.evtx", 3778);
}
@Test
public void testChunkBasedParseCorrectNumberOfFlowFilesFromAResizedFile() {
testValidEvents(ParseEvtx.CHUNK, "3778_events_not_exported.evtx", 16);
}
private void testValidEvents(String granularity, String filename, int expectedCount) {
TestRunner testRunner = TestRunners.newTestRunner(ParseEvtx.class);
testRunner.setProperty(ParseEvtx.GRANULARITY, granularity);
Map<String, String> attributes = new HashMap<>();
ClassLoader classLoader = this.getClass().getClassLoader();
InputStream resourceAsStream = classLoader.getResourceAsStream(filename);
testRunner.enqueue(resourceAsStream, attributes);
testRunner.run();
testRunner.assertTransferCount(ParseEvtx.REL_SUCCESS, expectedCount);
}
private int validateFlowFiles(List<MockFlowFile> successFlowFiles) throws SAXException, IOException, ParserConfigurationException {

View File

@ -102,7 +102,7 @@ public class ChunkHeaderTest {
offset += 11;
}
RecordTest.putNode(testBinaryReaderBuilder, fileLastRecordNumber, new Date());
RecordTest.putNode(testBinaryReaderBuilder, logLastRecordNumber, new Date());
testBinaryReaderBuilder.put(dataBuilder.toByteArray());
@ -133,7 +133,7 @@ public class ChunkHeaderTest {
assertTrue(chunkHeader.hasNext());
Record next = chunkHeader.next();
assertEquals(fileLastRecordNumber, next.getRecordNum().intValue());
assertEquals(logLastRecordNumber, next.getRecordNum().intValue());
RootNode rootNode = next.getRootNode();
List<BxmlNode> children = rootNode.getChildren();
assertEquals(1, children.size());