From 9324a2a74200e96dce9574e1b0105e9ac4a46871 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Mon, 9 Oct 2017 16:17:40 -0400 Subject: [PATCH] NIFI-4476 Improving logic for determining when to yield in PutTCP/UDP/Syslog/Splunk Signed-off-by: Pierre Villard This closes #2204. --- .../util/put/AbstractPutEventProcessor.java | 36 +++++++++++++++++- .../nifi/processors/splunk/PutSplunk.java | 7 +++- .../nifi/processors/standard/PutSyslog.java | 38 +++++++++++++++++-- .../nifi/processors/standard/PutTCP.java | 7 +++- .../nifi/processors/standard/PutUDP.java | 7 +++- 5 files changed, 85 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java index a246272c90..d09fe06866 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java @@ -252,21 +252,28 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr * Close any senders that haven't been active with in the given threshold * * @param idleThreshold the threshold to consider a sender as idle + * @return the number of connections that were closed as a result of being idle */ - protected void pruneIdleSenders(final long idleThreshold) { + protected PruneResult pruneIdleSenders(final long idleThreshold) { + int numClosed = 0; + int numConsidered = 0; + long currentTime = System.currentTimeMillis(); final List putBack = new ArrayList<>(); // if a connection hasn't been used with in the threshold then it gets closed ChannelSender sender; while ((sender = senderPool.poll()) != null) { + numConsidered++; if (currentTime > (sender.getLastUsed() + idleThreshold)) { getLogger().debug("Closing idle connection..."); sender.close(); + numClosed++; } else { putBack.add(sender); } } + // re-queue senders that weren't idle, but if the queue is full then close the sender for (ChannelSender putBackSender : putBack) { boolean returned = senderPool.offer(putBackSender); @@ -274,6 +281,8 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr putBackSender.close(); } } + + return new PruneResult(numClosed, numConsidered); } /** @@ -371,6 +380,31 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr } } + /** + * The results from pruning connections. + */ + protected static class PruneResult { + + private final int numClosed; + + private final int numConsidered; + + public PruneResult(final int numClosed, final int numConsidered) { + this.numClosed = numClosed; + this.numConsidered = numConsidered; + } + + public int getNumClosed() { + return numClosed; + } + + public int getNumConsidered() { + return numConsidered; + } + + } + + /** * Represents a range of messages from a FlowFile. */ diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java index 57ea812aae..d67419299f 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java @@ -138,8 +138,11 @@ public class PutSplunk extends AbstractPutEventProcessor { final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); if (flowFile == null) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index 412d8abc95..63f17ba244 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -265,20 +265,26 @@ public class PutSyslog extends AbstractSyslogProcessor { } } - private void pruneIdleSenders(final long idleThreshold){ + private PruneResult pruneIdleSenders(final long idleThreshold){ + int numClosed = 0; + int numConsidered = 0; + long currentTime = System.currentTimeMillis(); final List putBack = new ArrayList<>(); // if a connection hasn't been used with in the threshold then it gets closed ChannelSender sender; while ((sender = senderPool.poll()) != null) { + numConsidered++; if (currentTime > (sender.getLastUsed() + idleThreshold)) { getLogger().debug("Closing idle connection..."); sender.close(); + numClosed++; } else { putBack.add(sender); } } + // re-queue senders that weren't idle, but if the queue is full then close the sender for (ChannelSender putBackSender : putBack) { boolean returned = senderPool.offer(putBackSender); @@ -286,6 +292,8 @@ public class PutSyslog extends AbstractSyslogProcessor { putBackSender.close(); } } + + return new PruneResult(numClosed, numConsidered); } @Override @@ -295,8 +303,11 @@ public class PutSyslog extends AbstractSyslogProcessor { final List flowFiles = session.get(batchSize); if (flowFiles == null || flowFiles.isEmpty()) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; } @@ -394,4 +405,25 @@ public class PutSyslog extends AbstractSyslogProcessor { return false; } + private static class PruneResult { + + private final int numClosed; + + private final int numConsidered; + + public PruneResult(final int numClosed, final int numConsidered) { + this.numClosed = numClosed; + this.numConsidered = numConsidered; + } + + public int getNumClosed() { + return numClosed; + } + + public int getNumConsidered() { + return numConsidered; + } + + } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java index ee3e645074..a8deab22a1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java @@ -168,8 +168,11 @@ public class PutTCP extends AbstractPutEventProcessor { final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); if (flowFile == null) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java index af23c5441c..3157930dd5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java @@ -131,8 +131,11 @@ public class PutUDP extends AbstractPutEventProcessor { final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); if (flowFile == null) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; }