NIFI-379 fixed what appears to be a faulty edge condition handling

This commit is contained in:
joewitt 2015-06-04 22:48:17 -04:00
parent b53948a6ba
commit 819b65f7e0
2 changed files with 31 additions and 12 deletions

View File

@ -171,17 +171,17 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
int binsAdded = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", new Object[]{binsAdded});
final int flowFilesBinned = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned});
if (!isScheduled()) {
return;
}
binsAdded += migrateBins(context);
final int binsMigrated = migrateBins(context);
final int binsProcessed = processBins(context, sessionFactory);
if (binsProcessed == 0 && binsAdded == 0) {
//If we accomplished nothing then let's yield
if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
context.yield();
}
}
@ -203,7 +203,6 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
this.readyBins.add(bin);
}
}
return added;
}
@ -251,16 +250,16 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
int binsAdded = 0;
while (binManager.getBinCount() < context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
int flowFilesBinned = 0;
while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
if (!isScheduled()) {
return binsAdded;
break;
}
final ProcessSession session = sessionFactory.createSession();
FlowFile flowFile = session.get();
if (flowFile == null) {
return binsAdded;
break;
}
flowFile = this.preprocessFlowFile(context, session, flowFile);
@ -276,10 +275,10 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
this.readyBins.add(bin);
}
binsAdded++;
flowFilesBinned++;
}
return binsAdded;
return flowFilesBinned;
}
@OnScheduled

View File

@ -73,6 +73,26 @@ public class TestMergeContent {
bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test
public void testSimpleBinaryConcatSingleBin() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.MAX_BIN_COUNT, "1");
createFlowFiles(runner);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test
public void testSimpleBinaryConcatWithTextDelimiters() throws IOException, InterruptedException {