NIFI-1436 This closes #189. Combining stop() and close() into a single method to simplify, and adding checks on stopped flag in the run method of SocketChannelDispatcher and DatagramChannelDispatcher to ensure the run() method exits as soon as possible upon close() being called

NIFI-1436 Adding synchronization on keys set in close() method  based on Selector JavaDoc

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Bryan Bende 2016-01-25 09:44:24 -05:00 committed by joewitt
parent 925138b6c4
commit b7f7e6ed80
6 changed files with 41 additions and 38 deletions

View File

@ -212,7 +212,6 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
@OnUnscheduled @OnUnscheduled
public void onUnscheduled() { public void onUnscheduled() {
if (dispatcher != null) { if (dispatcher != null) {
dispatcher.stop();
dispatcher.close(); dispatcher.close();
} }
} }

View File

@ -38,11 +38,6 @@ public interface ChannelDispatcher extends Runnable {
*/ */
int getPort(); int getPort();
/**
* Stops the main dispatcher thread.
*/
void stop();
/** /**
* Closes all listeners and stops all handler threads. * Closes all listeners and stops all handler threads.
*/ */

View File

@ -65,8 +65,10 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
@Override @Override
public void open(final int port, int maxBufferSize) throws IOException { public void open(final int port, int maxBufferSize) throws IOException {
stopped = false;
datagramChannel = DatagramChannel.open(); datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false); datagramChannel.configureBlocking(false);
if (maxBufferSize > 0) { if (maxBufferSize > 0) {
datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF); final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
@ -87,9 +89,11 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
while (!stopped) { while (!stopped) {
try { try {
int selected = selector.select(); int selected = selector.select();
if (selected > 0){ // if stopped the selector could already be closed which would result in a ClosedSelectorException
if (selected > 0 && !stopped) {
Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
while (selectorKeys.hasNext()) { // if stopped we don't want to modify the keys because close() may still be in progress
while (selectorKeys.hasNext() && !stopped) {
SelectionKey key = selectorKeys.next(); SelectionKey key = selectorKeys.next();
selectorKeys.remove(); selectorKeys.remove();
if (!key.isValid()) { if (!key.isValid()) {
@ -140,14 +144,12 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
return datagramChannel == null ? 0 : datagramChannel.socket().getLocalPort(); return datagramChannel == null ? 0 : datagramChannel.socket().getLocalPort();
} }
@Override
public void stop() {
selector.wakeup();
stopped = true;
}
@Override @Override
public void close() { public void close() {
stopped = true;
if (selector != null) {
selector.wakeup();
}
IOUtils.closeQuietly(selector); IOUtils.closeQuietly(selector);
IOUtils.closeQuietly(datagramChannel); IOUtils.closeQuietly(datagramChannel);
} }

View File

@ -91,7 +91,8 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
@Override @Override
public void open(final int port, int maxBufferSize) throws IOException { public void open(final int port, int maxBufferSize) throws IOException {
this.executor = Executors.newFixedThreadPool(maxConnections); stopped = false;
executor = Executors.newFixedThreadPool(maxConnections);
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); serverSocketChannel.configureBlocking(false);
@ -114,9 +115,11 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
while (!stopped) { while (!stopped) {
try { try {
int selected = selector.select(); int selected = selector.select();
if (selected > 0){ // if stopped the selector could already be closed which would result in a ClosedSelectorException
if (selected > 0 && !stopped){
Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
while (selectorKeys.hasNext()){ // if stopped we don't want to modify the keys because close() may still be in progress
while (selectorKeys.hasNext() && !stopped) {
SelectionKey key = selectorKeys.next(); SelectionKey key = selectorKeys.next();
selectorKeys.remove(); selectorKeys.remove();
if (!key.isValid()){ if (!key.isValid()){
@ -196,28 +199,34 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
return 0; return 0;
} }
@Override
public void stop() {
stopped = true;
selector.wakeup();
}
@Override @Override
public void close() { public void close() {
executor.shutdown(); stopped = true;
try { if (selector != null) {
// Wait a while for existing tasks to terminate selector.wakeup();
if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
} }
for(SelectionKey key : selector.keys()){
IOUtils.closeQuietly(key.channel()); if (executor != null) {
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
if (selector != null) {
synchronized (selector.keys()) {
for (SelectionKey key : selector.keys()) {
IOUtils.closeQuietly(key.channel());
}
}
} }
IOUtils.closeQuietly(selector); IOUtils.closeQuietly(selector);
} }

View File

@ -309,7 +309,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@OnUnscheduled @OnUnscheduled
public void onUnscheduled() { public void onUnscheduled() {
if (channelDispatcher != null) { if (channelDispatcher != null) {
channelDispatcher.stop();
channelDispatcher.close(); channelDispatcher.close();
} }
} }

View File

@ -165,7 +165,6 @@ public class TestRELPSocketChannelHandler {
} finally { } finally {
// stop the dispatcher thread and ensure we shut down handler threads // stop the dispatcher thread and ensure we shut down handler threads
dispatcher.stop();
dispatcher.close(); dispatcher.close();
} }
} }