NIFI-4111 - NiFi shutdown

Fixed threads shutdown so that NiFi can shutdown gracefully

NIFI-4111 - Review - Handling SocketRemoteSiteListener (RAW S2S)

This closes #1963.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Pierre Villard 2017-06-30 00:03:53 +02:00 committed by Koji Kawamura
parent 3906d4e1d2
commit 45f82dc855
3 changed files with 31 additions and 7 deletions

View File

@ -54,11 +54,9 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
private HttpRemoteSiteListener(final NiFiProperties nifiProperties) {
super();
taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(final Runnable r) {
final Thread thread = defaultFactory.newThread(r);
final Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("Http Site-to-Site Transaction Maintenance");
thread.setDaemon(true);
return thread;
@ -160,6 +158,10 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
@Override
public void stop() {
if(taskExecutor != null) {
logger.debug("Stopping Http Site-to-Site Transaction Maintenance task...");
taskExecutor.shutdown();
}
if (transactionMaintenanceTask != null) {
logger.debug("Stopping transactionMaintenanceTask...");
transactionMaintenanceTask.cancel(true);

View File

@ -17,6 +17,7 @@
package org.apache.nifi.remote;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.exception.BadRequestException;
@ -29,6 +30,7 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSessio
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.ServerProtocol;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,12 +48,12 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.util.NiFiProperties;
public class SocketRemoteSiteListener implements RemoteSiteListener {
@ -86,6 +88,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
@Override
public void start() throws IOException {
final boolean secure = (sslContext != null);
final List<Thread> threads = new ArrayList<Thread>();
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(true);
@ -132,8 +135,9 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
LOG.trace("Got connection");
if (stopped.get()) {
return;
break;
}
final Socket socket = acceptedSocket;
final SocketChannel socketChannel = socket.getChannel();
final Thread thread = new Thread(new Runnable() {
@ -304,6 +308,14 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
thread.setName("Site-to-Site Worker Thread-" + (threadCount++));
LOG.debug("Handing connection to {}", thread);
thread.start();
threads.add(thread);
threads.removeIf(t -> !t.isAlive());
}
for(Thread thread : threads) {
if(thread != null) {
thread.interrupt();
}
}
}
});

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
@ -35,6 +36,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
@ -81,13 +83,21 @@ public class StandardHttpContextMap extends AbstractControllerService implements
@OnEnabled
public void onConfigured(final ConfigurationContext context) {
maxSize = context.getProperty(MAX_OUTSTANDING_REQUESTS).asInteger();
executor = Executors.newSingleThreadScheduledExecutor();
executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("StandardHttpContextMap-" + getIdentifier());
return thread;
}
});
maxRequestNanos = context.getProperty(REQUEST_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
final long scheduleNanos = maxRequestNanos / 2;
executor.scheduleWithFixedDelay(new CleanupExpiredRequests(), scheduleNanos, scheduleNanos, TimeUnit.NANOSECONDS);
}
@OnShutdown
@OnDisabled
public void cleanup() {
if (executor != null) {