diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 42428f60e2..885f3579dd 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -377,11 +377,6 @@ public class EndpointConnectionPool { return false; } - final String url = peer.getUrl(); - if ( url == null ) { - return false; - } - final BlockingQueue connectionQueue = connectionQueueMap.get(peer.getDescription()); if ( connectionQueue == null ) { return false; diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java index 0822b6adfc..8065f57027 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java @@ -64,7 +64,27 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe @Override public void close() throws IOException { - channel.close(); + IOException suppressed = null; + + try { + request.consume(); + } catch (final IOException ioe) { + suppressed = ioe; + } + + try { + channel.close(); + } catch (final IOException ioe) { + if ( suppressed != null ) { + ioe.addSuppressed(suppressed); + } + + throw ioe; + } + + if ( suppressed != null ) { + throw suppressed; + } } @Override diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java index dca1d847e5..50e916201e 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java @@ -54,7 +54,27 @@ public class SSLSocketChannelCommunicationsSession extends AbstractCommunication @Override public void close() throws IOException { - channel.close(); + IOException suppressed = null; + + try { + request.consume(); + } catch (final IOException ioe) { + suppressed = ioe; + } + + try { + channel.close(); + } catch (final IOException ioe) { + if ( suppressed != null ) { + ioe.addSuppressed(suppressed); + } + + throw ioe; + } + + if ( suppressed != null ) { + throw suppressed; + } } @Override diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java index f68c874a06..0ad0b74a43 100644 --- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java +++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java @@ -45,6 +45,8 @@ public class SocketChannelInputStream extends InputStream { } public void consume() throws IOException { + channel.shutdownInput(); + final byte[] b = new byte[4096]; final ByteBuffer buffer = ByteBuffer.wrap(b); int bytesRead; diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java index 7c74b20730..249ad483fe 100644 --- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java +++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java @@ -259,6 +259,8 @@ public class SSLSocketChannel implements Closeable { } public void consume() throws IOException { + channel.shutdownInput(); + final byte[] b = new byte[4096]; final ByteBuffer buffer = ByteBuffer.wrap(b); int readCount; diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index da9d0271de..740e405b87 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -150,6 +150,18 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { String url = getRemoteProcessGroup().getTargetUri().toString(); + // If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise, + // we don't want to create a transaction at all. + final FlowFile firstFlowFile; + if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) { + firstFlowFile = session.get(); + if ( firstFlowFile == null ) { + return; + } + } else { + firstFlowFile = null; + } + final SiteToSiteClient client = clientRef.get(); final Transaction transaction; try { @@ -187,7 +199,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { try { if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) { - transferFlowFiles(transaction, context, session); + transferFlowFiles(transaction, context, session, firstFlowFile); } else { final int numReceived = receiveFlowFiles(transaction, context, session); if ( numReceived == 0 ) { @@ -196,14 +208,15 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { } session.commit(); - } catch (final Exception e) { - final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, e.toString()); - logger.error("{} failed to communicate with remote NiFi instance due to {}", this, e.toString()); + } catch (final Throwable t) { + final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, t.toString()); + logger.error("{} failed to communicate with remote NiFi instance due to {}", this, t.toString()); if ( logger.isDebugEnabled() ) { - logger.error("", e); + logger.error("", t); } remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); + transaction.error(); session.rollback(); } } @@ -216,11 +229,8 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { } - private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return 0; - } + private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException { + FlowFile flowFile = firstFlowFile; try { final String userDn = transaction.getCommunicant().getDistinguishedName();