mirror of https://github.com/apache/nifi.git
NIFI-1669: Ensure that we take into account the number of 'ready bins' when determining whether or not we have reached out Max Bin threshold for BinFiles
Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
parent
423b333b71
commit
f120952ab7
|
@ -169,8 +169,18 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
|||
|
||||
@Override
|
||||
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
|
||||
final int flowFilesBinned = binFlowFiles(context, sessionFactory);
|
||||
getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned});
|
||||
final int totalBinCount = binManager.getBinCount() + readyBins.size();
|
||||
final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
|
||||
final int flowFilesBinned;
|
||||
|
||||
if (totalBinCount < maxBinCount) {
|
||||
flowFilesBinned = binFlowFiles(context, sessionFactory);
|
||||
getLogger().debug("Binned {} FlowFiles", new Object[] {flowFilesBinned});
|
||||
} else {
|
||||
flowFilesBinned = 0;
|
||||
getLogger().debug("Will not bin any FlowFiles because {} bins already exist;"
|
||||
+ "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount});
|
||||
}
|
||||
|
||||
if (!isScheduled()) {
|
||||
return;
|
||||
|
@ -194,7 +204,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
|||
// if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
|
||||
// this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
|
||||
// bins. So we may as well expire it now.
|
||||
if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
|
||||
if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
|
||||
final Bin bin = binManager.removeOldestBin();
|
||||
if (bin != null) {
|
||||
added++;
|
||||
|
|
Loading…
Reference in New Issue