diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index 81013f648a..f197bace70 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -31,6 +31,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; @@ -100,6 +101,11 @@ public class DistributedMapCacheClientService extends AbstractControllerService this.configContext = context; } + @OnStopped + public void onStopped() throws IOException { + close(); + } + @Override public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { return withCommsSession(new CommsAction() { @@ -292,14 +298,14 @@ public class DistributedMapCacheClientService extends AbstractControllerService public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { final String hostname = context.getProperty(HOSTNAME).getValue(); final int port = context.getProperty(PORT).asInteger(); - final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + final int timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final CommsSession commsSession; if (sslContextService == null) { - commsSession = new StandardCommsSession(hostname, port); + commsSession = new StandardCommsSession(hostname, port, timeoutMillis); } else { - commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); + commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port, timeoutMillis); } commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java index c1fa274eb2..34a0a7c170 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java @@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; @@ -100,17 +101,22 @@ public class DistributedSetCacheClientService extends AbstractControllerService this.configContext = context; } + @OnStopped + public void onStopped() throws IOException { + close(); + } + public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { final String hostname = context.getProperty(HOSTNAME).getValue(); final int port = context.getProperty(PORT).asInteger(); - final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + final int timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final CommsSession commsSession; if (sslContextService == null) { - commsSession = new StandardCommsSession(hostname, port); + commsSession = new StandardCommsSession(hostname, port, timeoutMillis); } else { - commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); + commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port, timeoutMillis); } commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java index 7808d21458..18ca5714c2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java @@ -19,6 +19,8 @@ package org.apache.nifi.distributed.cache.client; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; @@ -44,8 +46,12 @@ public class SSLCommsSession implements CommsSession { private int protocolVersion; - public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { - sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, null, true); + public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port, final int timeoutMillis) throws IOException { + final SocketChannel socketChannel = SocketChannel.open(); + socketChannel.socket().connect(new InetSocketAddress(hostname, port), timeoutMillis); + socketChannel.configureBlocking(false); + + sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel,true); in = new SSLSocketChannelInputStream(sslSocketChannel); bufferedIn = new BufferedInputStream(in); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java index 6a8ee45a3d..7545bef55d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java @@ -47,9 +47,11 @@ public class StandardCommsSession implements CommsSession { private int protocolVersion; - public StandardCommsSession(final String hostname, final int port) throws IOException { - socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); + public StandardCommsSession(final String hostname, final int port, final int timeoutMillis) throws IOException { + socketChannel = SocketChannel.open(); + socketChannel.socket().connect(new InetSocketAddress(hostname, port), timeoutMillis); socketChannel.configureBlocking(false); + in = new SocketChannelInputStream(socketChannel); bufferedIn = new InterruptableInputStream(new BufferedInputStream(in));