NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on

Added an unit test representing the fixed issue.
And updated existing testDefragment test to illustrate
the remaining FlowFiles those did not meet the threshold.
This commit is contained in:
Koji Kawamura 2019-03-20 12:16:16 +09:00
parent b97fbd2c89
commit cd3567873b
1 changed files with 59 additions and 4 deletions

View File

@ -158,27 +158,39 @@ public class TestMergeRecord {
final Map<String, String> attr1 = new HashMap<>();
attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
final Map<String, String> attr2 = new HashMap<>();
attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
final Map<String, String> attr3 = new HashMap<>();
attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
final Map<String, String> attr4 = new HashMap<>();
attr4.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3");
attr4.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
final Map<String, String> attr5 = new HashMap<>();
attr5.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
attr5.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3");
attr5.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("Name, Age\nJohn, 35", attr1);
runner.enqueue("Name, Age\nJane, 34", attr2);
runner.enqueue("Name, Age\nJake, 3", attr3);
runner.enqueue("Name, Age\nJan, 2", attr4);
runner.enqueue("Name, Age\nJay, 24", attr3);
runner.run(4);
runner.enqueue("Name, Age\nJake, 3", attr4);
runner.enqueue("Name, Age\nJan, 2", attr5);
runner.run(1);
assertEquals("Fragment id=2 should remain in the incoming connection", 1, runner.getQueueSize().getObjectCount());
runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
@ -195,6 +207,49 @@ public class TestMergeRecord {
}
@Test
public void testDefragmentWithMultipleRecords() {
runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
final Map<String, String> attr1 = new HashMap<>();
attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
attr1.put("record.count", "2");
final Map<String, String> attr2 = new HashMap<>();
attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
attr2.put("record.count", "2");
final Map<String, String> attr3 = new HashMap<>();
attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
attr3.put("record.count", "2");
runner.enqueue("Name, Age\nJohn, 35\nJane, 34", attr1);
runner.enqueue("Name, Age\nJake, 3\nJan, 2", attr2);
runner.enqueue("Name, Age\nJay, 24\nJade, 28", attr3);
runner.run(1);
assertEquals("Fragment id=2 should remain in the incoming connection", 1, runner.getQueueSize().getObjectCount());
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
assertEquals(1L, mffs.stream()
.filter(ff -> "4".equals(ff.getAttribute("record.count")))
.filter(ff -> "header\nJohn,35\nJane,34\nJake,3\nJan,2\n".equals(new String(ff.toByteArray())))
.count());
}
@Test
public void testMinSize() {
runner.setProperty(MergeRecord.MIN_RECORDS, "2");