From b7f7e6ed80ce1aa257cb6bcbd9add89c84549e01 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Mon, 25 Jan 2016 09:44:24 -0500 Subject: [PATCH] NIFI-1436 This closes #189. Combining stop() and close() into a single method to simplify, and adding checks on stopped flag in the run method of SocketChannelDispatcher and DatagramChannelDispatcher to ensure the run() method exits as soon as possible upon close() being called NIFI-1436 Adding synchronization on keys set in close() method based on Selector JavaDoc Signed-off-by: joewitt --- .../listen/AbstractListenEventProcessor.java | 1 - .../listen/dispatcher/ChannelDispatcher.java | 5 -- .../dispatcher/DatagramChannelDispatcher.java | 18 ++++--- .../dispatcher/SocketChannelDispatcher.java | 53 +++++++++++-------- .../processors/standard/ListenSyslog.java | 1 - .../handler/TestRELPSocketChannelHandler.java | 1 - 6 files changed, 41 insertions(+), 38 deletions(-) diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java index e994d81b5b..029f6dbf93 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -212,7 +212,6 @@ public abstract class AbstractListenEventProcessor extends Abst @OnUnscheduled public void onUnscheduled() { if (dispatcher != null) { - dispatcher.stop(); dispatcher.close(); } } diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java index 001ee9b484..cff230efda 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java @@ -38,11 +38,6 @@ public interface ChannelDispatcher extends Runnable { */ int getPort(); - /** - * Stops the main dispatcher thread. - */ - void stop(); - /** * Closes all listeners and stops all handler threads. */ diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java index a00a39f533..e362e15923 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java @@ -65,8 +65,10 @@ public class DatagramChannelDispatcher> impleme @Override public void open(final int port, int maxBufferSize) throws IOException { + stopped = false; datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); + if (maxBufferSize > 0) { datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF); @@ -87,9 +89,11 @@ public class DatagramChannelDispatcher> impleme while (!stopped) { try { int selected = selector.select(); - if (selected > 0){ + // if stopped the selector could already be closed which would result in a ClosedSelectorException + if (selected > 0 && !stopped) { Iterator selectorKeys = selector.selectedKeys().iterator(); - while (selectorKeys.hasNext()) { + // if stopped we don't want to modify the keys because close() may still be in progress + while (selectorKeys.hasNext() && !stopped) { SelectionKey key = selectorKeys.next(); selectorKeys.remove(); if (!key.isValid()) { @@ -140,14 +144,12 @@ public class DatagramChannelDispatcher> impleme return datagramChannel == null ? 0 : datagramChannel.socket().getLocalPort(); } - @Override - public void stop() { - selector.wakeup(); - stopped = true; - } - @Override public void close() { + stopped = true; + if (selector != null) { + selector.wakeup(); + } IOUtils.closeQuietly(selector); IOUtils.closeQuietly(datagramChannel); } diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java index 6dd6345da6..da5c414b6f 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java @@ -91,7 +91,8 @@ public class SocketChannelDispatcher> implements @Override public void open(final int port, int maxBufferSize) throws IOException { - this.executor = Executors.newFixedThreadPool(maxConnections); + stopped = false; + executor = Executors.newFixedThreadPool(maxConnections); final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); @@ -114,9 +115,11 @@ public class SocketChannelDispatcher> implements while (!stopped) { try { int selected = selector.select(); - if (selected > 0){ + // if stopped the selector could already be closed which would result in a ClosedSelectorException + if (selected > 0 && !stopped){ Iterator selectorKeys = selector.selectedKeys().iterator(); - while (selectorKeys.hasNext()){ + // if stopped we don't want to modify the keys because close() may still be in progress + while (selectorKeys.hasNext() && !stopped) { SelectionKey key = selectorKeys.next(); selectorKeys.remove(); if (!key.isValid()){ @@ -196,28 +199,34 @@ public class SocketChannelDispatcher> implements return 0; } - @Override - public void stop() { - stopped = true; - selector.wakeup(); - } - @Override public void close() { - executor.shutdown(); - try { - // Wait a while for existing tasks to terminate - if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - executor.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); + stopped = true; + if (selector != null) { + selector.wakeup(); } - for(SelectionKey key : selector.keys()){ - IOUtils.closeQuietly(key.channel()); + + if (executor != null) { + executor.shutdown(); + try { + // Wait a while for existing tasks to terminate + if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + executor.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + if (selector != null) { + synchronized (selector.keys()) { + for (SelectionKey key : selector.keys()) { + IOUtils.closeQuietly(key.channel()); + } + } } IOUtils.closeQuietly(selector); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 2e439b4cc7..8ca7eb18f8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -309,7 +309,6 @@ public class ListenSyslog extends AbstractSyslogProcessor { @OnUnscheduled public void onUnscheduled() { if (channelDispatcher != null) { - channelDispatcher.stop(); channelDispatcher.close(); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java index 2a51ad66c7..e3516705cc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java @@ -165,7 +165,6 @@ public class TestRELPSocketChannelHandler { } finally { // stop the dispatcher thread and ensure we shut down handler threads - dispatcher.stop(); dispatcher.close(); } }