This commit is contained in:
Mark Payne 2016-03-23 11:24:28 -04:00
commit f7ecb47e29
2 changed files with 16 additions and 6 deletions

View File

@ -169,8 +169,18 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
@Override @Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final int flowFilesBinned = binFlowFiles(context, sessionFactory); final int totalBinCount = binManager.getBinCount() + readyBins.size();
getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned}); final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
final int flowFilesBinned;
if (totalBinCount < maxBinCount) {
flowFilesBinned = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", new Object[] {flowFilesBinned});
} else {
flowFilesBinned = 0;
getLogger().debug("Will not bin any FlowFiles because {} bins already exist;"
+ "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount});
}
if (!isScheduled()) { if (!isScheduled()) {
return; return;
@ -194,7 +204,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
// if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
// this, then we will simply wait for it to expire because we can't get any more FlowFiles into the // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
// bins. So we may as well expire it now. // bins. So we may as well expire it now.
if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) { if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
final Bin bin = binManager.removeOldestBin(); final Bin bin = binManager.removeOldestBin();
if (bin != null) { if (bin != null) {
added++; added++;

View File

@ -323,9 +323,6 @@ public class GetKafka extends AbstractProcessor {
@OnScheduled @OnScheduled
public void schedule(ProcessContext context) { public void schedule(ProcessContext context) {
this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
if (this.executor == null || this.executor.isShutdown()) {
this.executor = Executors.newCachedThreadPool();
}
} }
@Override @Override
@ -335,6 +332,9 @@ public class GetKafka extends AbstractProcessor {
* of onTrigger. Will be reset to 'false' in the event of exception * of onTrigger. Will be reset to 'false' in the event of exception
*/ */
synchronized (this.consumerStreamsReady) { synchronized (this.consumerStreamsReady) {
if (this.executor == null || this.executor.isShutdown()) {
this.executor = Executors.newCachedThreadPool();
}
if (!this.consumerStreamsReady.get()) { if (!this.consumerStreamsReady.get()) {
Future<Void> f = this.executor.submit(new Callable<Void>() { Future<Void> f = this.executor.submit(new Callable<Void>() {
@Override @Override