From 45f82dc855177702a0c1a0e4c38af334b713c278 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 30 Jun 2017 00:03:53 +0200 Subject: [PATCH] NIFI-4111 - NiFi shutdown Fixed threads shutdown so that NiFi can shutdown gracefully NIFI-4111 - Review - Handling SocketRemoteSiteListener (RAW S2S) This closes #1963. Signed-off-by: Koji Kawamura --- .../nifi/remote/HttpRemoteSiteListener.java | 8 +++++--- .../nifi/remote/SocketRemoteSiteListener.java | 18 +++++++++++++++--- .../nifi/http/StandardHttpContextMap.java | 12 +++++++++++- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java index c9c523ef26..deb25a73e7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java @@ -54,11 +54,9 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { private HttpRemoteSiteListener(final NiFiProperties nifiProperties) { super(); taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { - private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); - @Override public Thread newThread(final Runnable r) { - final Thread thread = defaultFactory.newThread(r); + final Thread thread = Executors.defaultThreadFactory().newThread(r); thread.setName("Http Site-to-Site Transaction Maintenance"); thread.setDaemon(true); return thread; @@ -160,6 +158,10 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { @Override public void stop() { + if(taskExecutor != null) { + logger.debug("Stopping Http Site-to-Site Transaction Maintenance task..."); + taskExecutor.shutdown(); + } if (transactionMaintenanceTask != null) { logger.debug("Stopping transactionMaintenanceTask..."); transactionMaintenanceTask.cancel(true); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index a367e9eee2..2fae669a01 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -17,6 +17,7 @@ package org.apache.nifi.remote; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.exception.BadRequestException; @@ -29,6 +30,7 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSessio import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.ServerProtocol; +import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,12 +48,12 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.apache.nifi.remote.cluster.ClusterNodeInformation; -import org.apache.nifi.util.NiFiProperties; public class SocketRemoteSiteListener implements RemoteSiteListener { @@ -86,6 +88,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { @Override public void start() throws IOException { final boolean secure = (sslContext != null); + final List threads = new ArrayList(); final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(true); @@ -132,8 +135,9 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { LOG.trace("Got connection"); if (stopped.get()) { - return; + break; } + final Socket socket = acceptedSocket; final SocketChannel socketChannel = socket.getChannel(); final Thread thread = new Thread(new Runnable() { @@ -304,6 +308,14 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { thread.setName("Site-to-Site Worker Thread-" + (threadCount++)); LOG.debug("Handing connection to {}", thread); thread.start(); + threads.add(thread); + threads.removeIf(t -> !t.isAlive()); + } + + for(Thread thread : threads) { + if(thread != null) { + thread.interrupt(); + } } } }); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java b/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java index 9da357d019..88ce51a953 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import javax.servlet.AsyncContext; @@ -35,6 +36,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; @@ -81,13 +83,21 @@ public class StandardHttpContextMap extends AbstractControllerService implements @OnEnabled public void onConfigured(final ConfigurationContext context) { maxSize = context.getProperty(MAX_OUTSTANDING_REQUESTS).asInteger(); - executor = Executors.newSingleThreadScheduledExecutor(); + executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + final Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setName("StandardHttpContextMap-" + getIdentifier()); + return thread; + } + }); maxRequestNanos = context.getProperty(REQUEST_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS); final long scheduleNanos = maxRequestNanos / 2; executor.scheduleWithFixedDelay(new CleanupExpiredRequests(), scheduleNanos, scheduleNanos, TimeUnit.NANOSECONDS); } + @OnShutdown @OnDisabled public void cleanup() { if (executor != null) {