NIFI-3969: Prevent merging flowfiles prematurely when all bins fill but some are already full and can be processed

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #1850.
This commit is contained in:
Mark Payne 2017-05-24 08:27:06 -04:00 committed by Pierre Villard
parent bb96b0f464
commit 08b66b5b6a
2 changed files with 19 additions and 1 deletions

View File

@ -205,7 +205,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
// if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
// this, then we will simply wait for it to expire because we can't get any more FlowFiles into the // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
// bins. So we may as well expire it now. // bins. So we may as well expire it now.
if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) { if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
final Bin bin = binManager.removeOldestBin(); final Bin bin = binManager.removeOldestBin();
if (bin != null) { if (bin != null) {
added++; added++;

View File

@ -911,6 +911,24 @@ public class TestMergeContent {
bundle.assertAttributeExists(MergeContent.MERGE_BIN_AGE_ATTRIBUTE); bundle.assertAttributeExists(MergeContent.MERGE_BIN_AGE_ATTRIBUTE);
} }
@Test
public void testLeavesSmallBinUnmerged() {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MIN_ENTRIES, "5");
runner.setProperty(MergeContent.MAX_ENTRIES, "5");
runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
for (int i = 0; i < 17; i++) {
runner.enqueue(String.valueOf(i) + "\n");
}
runner.run(5);
runner.assertTransferCount(MergeContent.REL_MERGED, 3);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 15);
assertEquals(2, runner.getQueueSize().getObjectCount());
}
private void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException { private void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException {
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");