From cd3567873be7586aa285bd6b27e928e320299bee Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 20 Mar 2019 12:16:16 +0900 Subject: [PATCH] NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on Added an unit test representing the fixed issue. And updated existing testDefragment test to illustrate the remaining FlowFiles those did not meet the threshold. --- .../processors/standard/TestMergeRecord.java | 63 +++++++++++++++++-- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java index c54bf2ad70..3540b04ae1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java @@ -158,27 +158,39 @@ public class TestMergeRecord { final Map attr1 = new HashMap<>(); attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1"); + attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0"); final Map attr2 = new HashMap<>(); attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1"); + attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1"); final Map attr3 = new HashMap<>(); attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2"); + attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0"); final Map attr4 = new HashMap<>(); attr4.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); - attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2"); + attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3"); + attr4.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0"); + + final Map attr5 = new HashMap<>(); + attr5.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); + attr5.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3"); + attr5.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1"); runner.enqueue("Name, Age\nJohn, 35", attr1); runner.enqueue("Name, Age\nJane, 34", attr2); - runner.enqueue("Name, Age\nJake, 3", attr3); - runner.enqueue("Name, Age\nJan, 2", attr4); + runner.enqueue("Name, Age\nJay, 24", attr3); - runner.run(4); + runner.enqueue("Name, Age\nJake, 3", attr4); + runner.enqueue("Name, Age\nJan, 2", attr5); + runner.run(1); + + assertEquals("Fragment id=2 should remain in the incoming connection", 1, runner.getQueueSize().getObjectCount()); runner.assertTransferCount(MergeRecord.REL_MERGED, 2); runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4); @@ -195,6 +207,49 @@ public class TestMergeRecord { } + @Test + public void testDefragmentWithMultipleRecords() { + runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT); + + final Map attr1 = new HashMap<>(); + attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); + attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1"); + attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0"); + attr1.put("record.count", "2"); + + final Map attr2 = new HashMap<>(); + attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); + attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1"); + attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1"); + attr2.put("record.count", "2"); + + final Map attr3 = new HashMap<>(); + attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); + attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2"); + attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0"); + attr3.put("record.count", "2"); + + runner.enqueue("Name, Age\nJohn, 35\nJane, 34", attr1); + + runner.enqueue("Name, Age\nJake, 3\nJan, 2", attr2); + + runner.enqueue("Name, Age\nJay, 24\nJade, 28", attr3); + + runner.run(1); + + assertEquals("Fragment id=2 should remain in the incoming connection", 1, runner.getQueueSize().getObjectCount()); + runner.assertTransferCount(MergeRecord.REL_MERGED, 1); + runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2); + + final List mffs = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED); + assertEquals(1L, mffs.stream() + .filter(ff -> "4".equals(ff.getAttribute("record.count"))) + .filter(ff -> "header\nJohn,35\nJane,34\nJake,3\nJan,2\n".equals(new String(ff.toByteArray()))) + .count()); + + } + + @Test public void testMinSize() { runner.setProperty(MergeRecord.MIN_RECORDS, "2");