NIFI-3066: Create unit test to reproduce bug that results in IllegalArgumentException: Cannot migrate FlowFiles from a Process Session to itself

NIFI-3066: Ensure that when a Bin is created, it is always created with its own new session

This closes #1245
This commit is contained in:
Mark Payne 2016-11-18 15:21:56 -05:00 committed by Oleg Zhurakousky
parent e731f09573
commit 2ee66de1a6
2 changed files with 27 additions and 1 deletions

View File

@ -278,7 +278,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) { for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) {
final Set<FlowFile> unbinned = binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory); final Set<FlowFile> unbinned = binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
for (final FlowFile flowFile : unbinned) { for (final FlowFile flowFile : unbinned) {
Bin bin = new Bin(session, 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null); Bin bin = new Bin(sessionFactory.createSession(), 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
bin.offer(flowFile, session); bin.offer(flowFile, session);
this.readyBins.add(bin); this.readyBins.add(bin);
} }

View File

@ -66,6 +66,32 @@ public class TestMergeContent {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG");
} }
/**
* This test will verify that if we have a FlowFile larger than the Max Size for a Bin, it will go into its
* own bin and immediately be processed as its own bin.
*/
@Test
public void testFlowFileLargerThanBin() {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK);
runner.setProperty(MergeContent.MIN_ENTRIES, "2");
runner.setProperty(MergeContent.MAX_ENTRIES, "2");
runner.setProperty(MergeContent.MIN_SIZE, "1 KB");
runner.setProperty(MergeContent.MAX_SIZE, "5 KB");
runner.enqueue(new byte[1026]); // add flowfile that fits within the bin limits
runner.enqueue(new byte[1024 * 6]); // add flowfile that is larger than the bin limit
runner.run(2); // run twice so that we have a chance to create two bins (though we shouldn't create 2, because only 1 bin will be full)
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 1);
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
// Queue should not be empty because the first FlowFile will be transferred back to the input queue
// when we run out @OnStopped logic, since it won't be transferred to any bin.
runner.assertQueueNotEmpty();
}
@Test @Test
public void testSimpleAvroConcat() throws IOException, InterruptedException { public void testSimpleAvroConcat() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); final TestRunner runner = TestRunners.newTestRunner(new MergeContent());