NIFI-383: Ensure that we always clean up sockets

This commit is contained in:
Mark Payne 2015-02-25 14:06:58 -05:00
parent 5aef55b5f9
commit ca23ad8eaa
6 changed files with 66 additions and 17 deletions

View File

@ -377,11 +377,6 @@ public class EndpointConnectionPool {
return false; return false;
} }
final String url = peer.getUrl();
if ( url == null ) {
return false;
}
final BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peer.getDescription()); final BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peer.getDescription());
if ( connectionQueue == null ) { if ( connectionQueue == null ) {
return false; return false;

View File

@ -64,7 +64,27 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe
@Override @Override
public void close() throws IOException { public void close() throws IOException {
IOException suppressed = null;
try {
request.consume();
} catch (final IOException ioe) {
suppressed = ioe;
}
try {
channel.close(); channel.close();
} catch (final IOException ioe) {
if ( suppressed != null ) {
ioe.addSuppressed(suppressed);
}
throw ioe;
}
if ( suppressed != null ) {
throw suppressed;
}
} }
@Override @Override

View File

@ -54,7 +54,27 @@ public class SSLSocketChannelCommunicationsSession extends AbstractCommunication
@Override @Override
public void close() throws IOException { public void close() throws IOException {
IOException suppressed = null;
try {
request.consume();
} catch (final IOException ioe) {
suppressed = ioe;
}
try {
channel.close(); channel.close();
} catch (final IOException ioe) {
if ( suppressed != null ) {
ioe.addSuppressed(suppressed);
}
throw ioe;
}
if ( suppressed != null ) {
throw suppressed;
}
} }
@Override @Override

View File

@ -45,6 +45,8 @@ public class SocketChannelInputStream extends InputStream {
} }
public void consume() throws IOException { public void consume() throws IOException {
channel.shutdownInput();
final byte[] b = new byte[4096]; final byte[] b = new byte[4096];
final ByteBuffer buffer = ByteBuffer.wrap(b); final ByteBuffer buffer = ByteBuffer.wrap(b);
int bytesRead; int bytesRead;

View File

@ -259,6 +259,8 @@ public class SSLSocketChannel implements Closeable {
} }
public void consume() throws IOException { public void consume() throws IOException {
channel.shutdownInput();
final byte[] b = new byte[4096]; final byte[] b = new byte[4096];
final ByteBuffer buffer = ByteBuffer.wrap(b); final ByteBuffer buffer = ByteBuffer.wrap(b);
int readCount; int readCount;

View File

@ -150,6 +150,18 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
String url = getRemoteProcessGroup().getTargetUri().toString(); String url = getRemoteProcessGroup().getTargetUri().toString();
// If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise,
// we don't want to create a transaction at all.
final FlowFile firstFlowFile;
if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
firstFlowFile = session.get();
if ( firstFlowFile == null ) {
return;
}
} else {
firstFlowFile = null;
}
final SiteToSiteClient client = clientRef.get(); final SiteToSiteClient client = clientRef.get();
final Transaction transaction; final Transaction transaction;
try { try {
@ -187,7 +199,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
try { try {
if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) { if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
transferFlowFiles(transaction, context, session); transferFlowFiles(transaction, context, session, firstFlowFile);
} else { } else {
final int numReceived = receiveFlowFiles(transaction, context, session); final int numReceived = receiveFlowFiles(transaction, context, session);
if ( numReceived == 0 ) { if ( numReceived == 0 ) {
@ -196,14 +208,15 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} }
session.commit(); session.commit();
} catch (final Exception e) { } catch (final Throwable t) {
final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, e.toString()); final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, t.toString());
logger.error("{} failed to communicate with remote NiFi instance due to {}", this, e.toString()); logger.error("{} failed to communicate with remote NiFi instance due to {}", this, t.toString());
if ( logger.isDebugEnabled() ) { if ( logger.isDebugEnabled() ) {
logger.error("", e); logger.error("", t);
} }
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
transaction.error();
session.rollback(); session.rollback();
} }
} }
@ -216,11 +229,8 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} }
private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException { private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException {
FlowFile flowFile = session.get(); FlowFile flowFile = firstFlowFile;
if (flowFile == null) {
return 0;
}
try { try {
final String userDn = transaction.getCommunicant().getDistinguishedName(); final String userDn = transaction.getCommunicant().getDistinguishedName();