NIFI-4391 Ensuring channel is closed when unable to connect in SocketChannelSender

NIFI-4391 Adding debug logging of client port upon connection

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2159.
This commit is contained in:
Bryan Bende 2017-09-18 10:01:16 -04:00 committed by Pierre Villard
parent 7b07eb0577
commit a813ae113e
2 changed files with 45 additions and 30 deletions

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions; import java.net.StandardSocketOptions;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
@ -42,40 +43,53 @@ public class SocketChannelSender extends ChannelSender {
@Override @Override
public void open() throws IOException { public void open() throws IOException {
if (channel == null) { try {
channel = SocketChannel.open(); if (channel == null) {
channel.configureBlocking(false); channel = SocketChannel.open();
channel.configureBlocking(false);
if (maxSendBufferSize > 0) { if (maxSendBufferSize > 0) {
channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize); channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize);
final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF); final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF);
if (actualSendBufSize < maxSendBufferSize) { if (actualSendBufSize < maxSendBufferSize) {
logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize
+ " bytes but could only set to " + actualSendBufSize + "bytes. You may want to " + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to "
+ "consider changing the Operating System's maximum send buffer"); + "consider changing the Operating System's maximum send buffer");
}
}
}
if (!channel.isConnected()) {
final long startTime = System.currentTimeMillis();
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getByName(host), port);
if (!channel.connect(socketAddress)) {
while (!channel.finishConnect()) {
if (System.currentTimeMillis() > startTime + timeout) {
throw new SocketTimeoutException("Timed out connecting to " + host + ":" + port);
}
try {
Thread.sleep(50L);
} catch (final InterruptedException e) {
} }
} }
} }
socketChannelOutput = new SocketChannelOutputStream(channel); if (!channel.isConnected()) {
socketChannelOutput.setTimeout(timeout); final long startTime = System.currentTimeMillis();
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getByName(host), port);
if (!channel.connect(socketAddress)) {
while (!channel.finishConnect()) {
if (System.currentTimeMillis() > startTime + timeout) {
throw new SocketTimeoutException("Timed out connecting to " + host + ":" + port);
}
try {
Thread.sleep(50L);
} catch (final InterruptedException e) {
}
}
}
if (logger.isDebugEnabled()) {
final SocketAddress localAddress = channel.getLocalAddress();
if (localAddress != null && localAddress instanceof InetSocketAddress) {
final InetSocketAddress inetSocketAddress = (InetSocketAddress) localAddress;
logger.debug("Connected to local port {}", new Object[] {inetSocketAddress.getPort()});
}
}
socketChannelOutput = new SocketChannelOutputStream(channel);
socketChannelOutput.setTimeout(timeout);
}
} catch (final IOException e) {
IOUtils.closeQuietly(channel);
throw e;
} }
} }

View File

@ -1305,7 +1305,8 @@ public class ProcessGroupResource extends ApplicationResource {
} }
private void activateControllerServices(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest, private void activateControllerServices(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest,
final Pause pause, final Collection<AffectedComponentDTO> affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep) throws InterruptedException { final Pause pause, final Collection<AffectedComponentDTO> affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep)
throws InterruptedException {
final Set<String> affectedServiceIds = affectedServices.stream() final Set<String> affectedServiceIds = affectedServices.stream()
.map(component -> component.getId()) .map(component -> component.getId())