diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java index 3534f955b8..24280781ca 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java @@ -25,6 +25,7 @@ import org.apache.nifi.remote.protocol.CommunicationsSession; public class Peer implements Communicant { + private final PeerDescription description; private final CommunicationsSession commsSession; private final String url; private final String clusterUrl; @@ -34,7 +35,8 @@ public class Peer implements Communicant { private final Map penaltyExpirationMap = new HashMap<>(); private boolean closed = false; - public Peer(final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) { + public Peer(final PeerDescription description, final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) { + this.description = description; this.commsSession = commsSession; this.url = peerUrl; this.clusterUrl = clusterUrl; @@ -48,6 +50,10 @@ public class Peer implements Communicant { } } + public PeerDescription getDescription() { + return description; + } + @Override public String getUrl() { return url; diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java new file mode 100644 index 0000000000..0e8e49897b --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public class PeerDescription { + private final String hostname; + private final int port; + private final boolean secure; + + public PeerDescription(final String hostname, final int port, final boolean secure) { + this.hostname = hostname; + this.port = port; + this.secure = secure; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + public boolean isSecure() { + return secure; + } + + @Override + public String toString() { + return "PeerDescription[hostname=" + hostname + ", port=" + port + ", secure=" + secure + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((hostname == null) ? 0 : hostname.hashCode()); + result = prime * result + port; + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + final PeerDescription other = (PeerDescription) obj; + if (hostname == null) { + if (other.hostname != null) { + return false; + } + } else if (!hostname.equals(other.hostname)) { + return false; + } + + return port == other.port; + } +} diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java index d1cb0766d6..b68ac33648 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java @@ -17,43 +17,31 @@ package org.apache.nifi.remote; public class PeerStatus { - - private final String hostname; - private final int port; - private final boolean secure; + private final PeerDescription description; private final int numFlowFiles; - public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) { - this.hostname = hostname; - this.port = port; - this.secure = secure; + public PeerStatus(final PeerDescription description, final int numFlowFiles) { + this.description = description; this.numFlowFiles = numFlowFiles; } - public String getHostname() { - return hostname; + public PeerDescription getPeerDescription() { + return description; } - - public int getPort() { - return port; - } - - public boolean isSecure() { - return secure; - } - + public int getFlowFileCount() { return numFlowFiles; } @Override public String toString() { - return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]"; + return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort() + + ",secure=" + description.isSecure() + ",flowFileCount=" + numFlowFiles + "]"; } @Override public int hashCode() { - return 9824372 + hostname.hashCode() + port; + return 9824372 + description.getHostname().hashCode() + description.getPort() * 41; } @Override @@ -67,6 +55,6 @@ public class PeerStatus { } final PeerStatus other = (PeerStatus) obj; - return port == other.port && hostname.equals(other.hostname); + return description.equals(other.getPeerDescription()); } } diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index f9a8a383bf..450daec423 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -61,6 +61,7 @@ import javax.security.cert.CertificateNotYetValidException; import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerDescription; import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.RemoteDestination; import org.apache.nifi.remote.RemoteResourceInitiator; @@ -97,8 +98,8 @@ public class EndpointConnectionPool { private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class); - private final BlockingQueue connectionQueue = new LinkedBlockingQueue<>(); - private final ConcurrentMap peerTimeoutExpirations = new ConcurrentHashMap<>(); + private final ConcurrentMap> connectionQueueMap = new ConcurrentHashMap<>(); + private final ConcurrentMap peerTimeoutExpirations = new ConcurrentHashMap<>(); private final URI clusterUrl; private final String apiUri; @@ -227,6 +228,23 @@ public class EndpointConnectionPool { SocketClientProtocol protocol = null; EndpointConnection connection; Peer peer = null; + + logger.debug("{} getting next peer status", this); + final PeerStatus peerStatus = getNextPeerStatus(direction); + logger.debug("{} next peer status = {}", this, peerStatus); + if ( peerStatus == null ) { + return null; + } + + final PeerDescription peerDescription = peerStatus.getPeerDescription(); + BlockingQueue connectionQueue = connectionQueueMap.get(peerStatus); + if ( connectionQueue == null ) { + connectionQueue = new LinkedBlockingQueue<>(); + BlockingQueue existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue); + if ( existing != null ) { + connectionQueue = existing; + } + } final List addBack = new ArrayList<>(); try { @@ -254,19 +272,12 @@ public class EndpointConnectionPool { protocol = new SocketClientProtocol(); protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId)); - logger.debug("{} getting next peer status", this); - final PeerStatus peerStatus = getNextPeerStatus(direction); - logger.debug("{} next peer status = {}", this, peerStatus); - if ( peerStatus == null ) { - return null; - } - final long penalizationMillis = remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS); try { logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus); commsSession = establishSiteToSiteConnection(peerStatus); } catch (final IOException ioe) { - penalize(peerStatus, penalizationMillis); + penalize(peerStatus.getPeerDescription(), penalizationMillis); throw ioe; } @@ -283,8 +294,8 @@ public class EndpointConnectionPool { } } - final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort(); - peer = new Peer(commsSession, peerUrl, clusterUrl.toString()); + final String peerUrl = "nifi://" + peerDescription.getHostname() + ":" + peerDescription.getPort(); + peer = new Peer(peerDescription, commsSession, peerUrl, clusterUrl.toString()); // set properties based on config if ( config != null ) { @@ -371,6 +382,11 @@ public class EndpointConnectionPool { return false; } + final BlockingQueue connectionQueue = connectionQueueMap.get(peer.getDescription()); + if ( connectionQueue == null ) { + return false; + } + activeConnections.remove(endpointConnection); if ( shutdown ) { terminate(endpointConnection); @@ -381,14 +397,14 @@ public class EndpointConnectionPool { } } - private void penalize(final PeerStatus status, final long penalizationMillis) { - Long expiration = peerTimeoutExpirations.get(status); + private void penalize(final PeerDescription peerDescription, final long penalizationMillis) { + Long expiration = peerTimeoutExpirations.get(peerDescription); if ( expiration == null ) { expiration = Long.valueOf(0L); } final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis); - peerTimeoutExpirations.put(status, Long.valueOf(newExpiration)); + peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration)); } /** @@ -396,19 +412,7 @@ public class EndpointConnectionPool { * @param peer */ public void penalize(final Peer peer, final long penalizationMillis) { - String host; - int port; - try { - final URI uri = new URI(peer.getUrl()); - host = uri.getHost(); - port = uri.getPort(); - } catch (final URISyntaxException e) { - host = peer.getHost(); - port = -1; - } - - final PeerStatus status = new PeerStatus(host, port, true, 1); - penalize(status, penalizationMillis); + penalize(peer.getDescription(), penalizationMillis); } private void cleanup(final SocketClientProtocol protocol, final Peer peer) { @@ -509,7 +513,8 @@ public class EndpointConnectionPool { final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); final List nodeInfos = new ArrayList<>(); for ( final PeerStatus peerStatus : statuses ) { - final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount()); + final PeerDescription description = peerStatus.getPeerDescription(); + final NodeInformation nodeInfo = new NodeInformation(description.getHostname(), description.getPort(), 0, description.isSecure(), peerStatus.getFlowFileCount()); nodeInfos.add(nodeInfo); } clusterNodeInfo.setNodeInformation(nodeInfos); @@ -526,7 +531,7 @@ public class EndpointConnectionPool { if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) { final Set equalizedSet = new HashSet<>(cache.getStatuses().size()); for (final PeerStatus status : cache.getStatuses()) { - final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1); + final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1); equalizedSet.add(equalizedStatus); } @@ -543,8 +548,9 @@ public class EndpointConnectionPool { throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications"); } + final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://")); final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port); - final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString()); + final Peer peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString()); final SocketClientProtocol clientProtocol = new SocketClientProtocol(); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); @@ -602,7 +608,8 @@ public class EndpointConnectionPool { final OutputStream out = new BufferedOutputStream(fos)) { for (final PeerStatus status : statuses) { - final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n"; + final PeerDescription description = status.getPeerDescription(); + final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + "\n"; out.write(line.getBytes(StandardCharsets.UTF_8)); } @@ -631,7 +638,7 @@ public class EndpointConnectionPool { final int port = Integer.parseInt(splits[1]); final boolean secure = Boolean.parseBoolean(splits[2]); - statuses.add(new PeerStatus(hostname, port, secure, 1)); + statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1)); } } @@ -640,7 +647,8 @@ public class EndpointConnectionPool { private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException { - return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort()); + final PeerDescription description = peerStatus.getPeerDescription(); + return establishSiteToSiteConnection(description.getHostname(), description.getPort()); } private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException { @@ -720,7 +728,8 @@ public class EndpointConnectionPool { final int index = n % destinations.size(); PeerStatus status = destinations.get(index); if ( status == null ) { - status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles()); + final PeerDescription description = new PeerDescription(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure()); + status = new PeerStatus(description, nodeInfo.getTotalFlowFiles()); destinations.set(index, status); break; } else { @@ -744,27 +753,29 @@ public class EndpointConnectionPool { private void cleanupExpiredSockets() { - final List connections = new ArrayList<>(); - - EndpointConnection connection; - while ((connection = connectionQueue.poll()) != null) { - // If the socket has not been used in 10 seconds, shut it down. - final long lastUsed = connection.getLastTimeUsed(); - if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) { - try { - connection.getSocketClientProtocol().shutdown(connection.getPeer()); - } catch (final Exception e) { - logger.debug("Failed to shut down {} using {} due to {}", - new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} ); + for ( final BlockingQueue connectionQueue : connectionQueueMap.values()) { + final List connections = new ArrayList<>(); + + EndpointConnection connection; + while ((connection = connectionQueue.poll()) != null) { + // If the socket has not been used in 10 seconds, shut it down. + final long lastUsed = connection.getLastTimeUsed(); + if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) { + try { + connection.getSocketClientProtocol().shutdown(connection.getPeer()); + } catch (final Exception e) { + logger.debug("Failed to shut down {} using {} due to {}", + new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} ); + } + + terminate(connection); + } else { + connections.add(connection); } - - terminate(connection); - } else { - connections.add(connection); } + + connectionQueue.addAll(connections); } - - connectionQueue.addAll(connections); } public void shutdown() { @@ -775,10 +786,12 @@ public class EndpointConnectionPool { for ( final EndpointConnection conn : activeConnections ) { conn.getPeer().getCommunicationsSession().interrupt(); } - - EndpointConnection state; - while ( (state = connectionQueue.poll()) != null) { - cleanup(state.getSocketClientProtocol(), state.getPeer()); + + for ( final BlockingQueue connectionQueue : connectionQueueMap.values() ) { + EndpointConnection state; + while ( (state = connectionQueue.poll()) != null) { + cleanup(state.getSocketClientProtocol(), state.getPeer()); + } } } diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 390f4fc89e..c3275eab00 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -34,6 +34,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerDescription; import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.RemoteDestination; import org.apache.nifi.remote.RemoteResourceInitiator; @@ -117,7 +118,7 @@ public class SocketClientProtocol implements ClientProtocol { properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression)); if ( destinationId != null ) { - properties.put(HandshakeProperty.PORT_IDENTIFIER, destination.getIdentifier()); + properties.put(HandshakeProperty.PORT_IDENTIFIER, destinationId); } properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) ); @@ -229,7 +230,7 @@ public class SocketClientProtocol implements ClientProtocol { final int port = dis.readInt(); final boolean secure = dis.readBoolean(); final int flowFileCount = dis.readInt(); - peers.add(new PeerStatus(hostname, port, secure, flowFileCount)); + peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount)); } logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer); diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java index 275e40c0ab..cb7af08e84 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java @@ -41,7 +41,7 @@ public class TestEndpointConnectionStatePool { clusterNodeInfo.setNodeInformation(collection); final List destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + System.out.println(peerStatus.getPeerDescription()); } } @@ -55,7 +55,7 @@ public class TestEndpointConnectionStatePool { clusterNodeInfo.setNodeInformation(collection); final List destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + System.out.println(peerStatus.getPeerDescription()); } } @@ -75,7 +75,7 @@ public class TestEndpointConnectionStatePool { clusterNodeInfo.setNodeInformation(collection); final List destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + System.out.println(peerStatus.getPeerDescription()); } } @@ -89,7 +89,7 @@ public class TestEndpointConnectionStatePool { clusterNodeInfo.setNodeInformation(collection); final List destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + System.out.println(peerStatus.getPeerDescription()); } } } diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java index bb16a3473e..0f48b03a14 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java @@ -43,26 +43,31 @@ public class TestSiteToSiteClient { final SiteToSiteClient client = new SiteToSiteClient.Builder() .url("http://localhost:8080/nifi") .portName("cba") - .requestBatchCount(1) + .requestBatchCount(10) .build(); try { - final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); - Assert.assertNotNull(transaction); - - final DataPacket packet = transaction.receive(); - Assert.assertNotNull(packet); - - final InputStream in = packet.getData(); - final long size = packet.getSize(); - final byte[] buff = new byte[(int) size]; - - StreamUtils.fillBuffer(in, buff); - - Assert.assertNull(transaction.receive()); - - transaction.confirm(); - transaction.complete(); + for (int i=0; i < 1000; i++) { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + Assert.assertNotNull(transaction); + + DataPacket packet; + while (true) { + packet = transaction.receive(); + if ( packet == null ) { + break; + } + + final InputStream in = packet.getData(); + final long size = packet.getSize(); + final byte[] buff = new byte[(int) size]; + + StreamUtils.fillBuffer(in, buff); + } + + transaction.confirm(); + transaction.complete(); + } } finally { client.close(); } @@ -70,7 +75,7 @@ public class TestSiteToSiteClient { @Test - //@Ignore("For local testing only; not really a unit test but a manual test") + @Ignore("For local testing only; not really a unit test but a manual test") public void testSend() throws IOException { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index 8a4839bf4a..493d1fec0e 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -206,7 +206,8 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { protocol.setRootProcessGroup(rootGroup.get()); protocol.setNodeInformant(nodeInformant); - peer = new Peer(commsSession, peerUri, "nifi://localhost:" + getPort()); + final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null); + peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort()); LOG.debug("Handshaking...."); protocol.handshake(peer);