NIFI-3732 Adding connect with timeout to StandardCommsSession and SSLCommsSession to avoid blocking

This closes #1842.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Bryan Bende 2017-05-22 20:51:04 -04:00 committed by Koji Kawamura
parent ded396f0ef
commit a8de27e69b
4 changed files with 30 additions and 10 deletions

View File

@ -31,6 +31,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
@ -100,6 +101,11 @@ public class DistributedMapCacheClientService extends AbstractControllerService
this.configContext = context; this.configContext = context;
} }
@OnStopped
public void onStopped() throws IOException {
close();
}
@Override @Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
return withCommsSession(new CommsAction<Boolean>() { return withCommsSession(new CommsAction<Boolean>() {
@ -292,14 +298,14 @@ public class DistributedMapCacheClientService extends AbstractControllerService
public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
final String hostname = context.getProperty(HOSTNAME).getValue(); final String hostname = context.getProperty(HOSTNAME).getValue();
final int port = context.getProperty(PORT).asInteger(); final int port = context.getProperty(PORT).asInteger();
final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); final int timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final CommsSession commsSession; final CommsSession commsSession;
if (sslContextService == null) { if (sslContextService == null) {
commsSession = new StandardCommsSession(hostname, port); commsSession = new StandardCommsSession(hostname, port, timeoutMillis);
} else { } else {
commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port, timeoutMillis);
} }
commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);

View File

@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
@ -100,17 +101,22 @@ public class DistributedSetCacheClientService extends AbstractControllerService
this.configContext = context; this.configContext = context;
} }
@OnStopped
public void onStopped() throws IOException {
close();
}
public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
final String hostname = context.getProperty(HOSTNAME).getValue(); final String hostname = context.getProperty(HOSTNAME).getValue();
final int port = context.getProperty(PORT).asInteger(); final int port = context.getProperty(PORT).asInteger();
final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); final int timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final CommsSession commsSession; final CommsSession commsSession;
if (sslContextService == null) { if (sslContextService == null) {
commsSession = new StandardCommsSession(hostname, port); commsSession = new StandardCommsSession(hostname, port, timeoutMillis);
} else { } else {
commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port, timeoutMillis);
} }
commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);

View File

@ -19,6 +19,8 @@ package org.apache.nifi.distributed.cache.client;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -44,8 +46,12 @@ public class SSLCommsSession implements CommsSession {
private int protocolVersion; private int protocolVersion;
public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port, final int timeoutMillis) throws IOException {
sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, null, true); final SocketChannel socketChannel = SocketChannel.open();
socketChannel.socket().connect(new InetSocketAddress(hostname, port), timeoutMillis);
socketChannel.configureBlocking(false);
sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel,true);
in = new SSLSocketChannelInputStream(sslSocketChannel); in = new SSLSocketChannelInputStream(sslSocketChannel);
bufferedIn = new BufferedInputStream(in); bufferedIn = new BufferedInputStream(in);

View File

@ -47,9 +47,11 @@ public class StandardCommsSession implements CommsSession {
private int protocolVersion; private int protocolVersion;
public StandardCommsSession(final String hostname, final int port) throws IOException { public StandardCommsSession(final String hostname, final int port, final int timeoutMillis) throws IOException {
socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); socketChannel = SocketChannel.open();
socketChannel.socket().connect(new InetSocketAddress(hostname, port), timeoutMillis);
socketChannel.configureBlocking(false); socketChannel.configureBlocking(false);
in = new SocketChannelInputStream(socketChannel); in = new SocketChannelInputStream(socketChannel);
bufferedIn = new InterruptableInputStream(new BufferedInputStream(in)); bufferedIn = new InterruptableInputStream(new BufferedInputStream(in));