From 2ee66de1a6f9afb495c6e6565f4d4ba9cff68f88 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 18 Nov 2016 15:21:56 -0500 Subject: [PATCH] NIFI-3066: Create unit test to reproduce bug that results in IllegalArgumentException: Cannot migrate FlowFiles from a Process Session to itself NIFI-3066: Ensure that when a Bin is created, it is always created with its own new session This closes #1245 --- .../nifi/processor/util/bin/BinFiles.java | 2 +- .../processors/standard/TestMergeContent.java | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index 26724917bb..67e37c28d8 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -278,7 +278,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { for (final Map.Entry> entry : flowFileGroups.entrySet()) { final Set unbinned = binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory); for (final FlowFile flowFile : unbinned) { - Bin bin = new Bin(session, 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null); + Bin bin = new Bin(sessionFactory.createSession(), 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null); bin.offer(flowFile, session); this.readyBins.add(bin); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index 3a6d07c4f3..6590d47888 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -66,6 +66,32 @@ public class TestMergeContent { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG"); } + /** + * This test will verify that if we have a FlowFile larger than the Max Size for a Bin, it will go into its + * own bin and immediately be processed as its own bin. + */ + @Test + public void testFlowFileLargerThanBin() { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK); + runner.setProperty(MergeContent.MIN_ENTRIES, "2"); + runner.setProperty(MergeContent.MAX_ENTRIES, "2"); + runner.setProperty(MergeContent.MIN_SIZE, "1 KB"); + runner.setProperty(MergeContent.MAX_SIZE, "5 KB"); + + runner.enqueue(new byte[1026]); // add flowfile that fits within the bin limits + runner.enqueue(new byte[1024 * 6]); // add flowfile that is larger than the bin limit + runner.run(2); // run twice so that we have a chance to create two bins (though we shouldn't create 2, because only 1 bin will be full) + + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 1); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + + // Queue should not be empty because the first FlowFile will be transferred back to the input queue + // when we run out @OnStopped logic, since it won't be transferred to any bin. + runner.assertQueueNotEmpty(); + } + @Test public void testSimpleAvroConcat() throws IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent());