diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 88a34aaa3b..18075dbbeb 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -250,26 +250,26 @@ public class EndpointConnectionPool { EndpointConnection connection; Peer peer = null; - logger.debug("{} getting next peer status", this); - final PeerStatus peerStatus = getNextPeerStatus(direction); - logger.debug("{} next peer status = {}", this, peerStatus); - if (peerStatus == null) { - return null; - } - - final PeerDescription peerDescription = peerStatus.getPeerDescription(); - BlockingQueue connectionQueue = connectionQueueMap.get(peerDescription); - if (connectionQueue == null) { - connectionQueue = new LinkedBlockingQueue<>(); - BlockingQueue existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue); - if (existing != null) { - connectionQueue = existing; + do { + final List addBack = new ArrayList<>(); + logger.debug("{} getting next peer status", this); + final PeerStatus peerStatus = getNextPeerStatus(direction); + logger.debug("{} next peer status = {}", this, peerStatus); + if (peerStatus == null) { + return null; } - } - final List addBack = new ArrayList<>(); - try { - do { + final PeerDescription peerDescription = peerStatus.getPeerDescription(); + BlockingQueue connectionQueue = connectionQueueMap.get(peerDescription); + if (connectionQueue == null) { + connectionQueue = new LinkedBlockingQueue<>(); + BlockingQueue existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue); + if (existing != null) { + connectionQueue = existing; + } + } + + try { connection = connectionQueue.poll(); logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection); final String portId = getPortIdentifier(direction); @@ -387,21 +387,23 @@ public class EndpointConnectionPool { protocol = connection.getSocketClientProtocol(); } } - } while (connection == null || codec == null || commsSession == null || protocol == null); - } catch (final Throwable t) { - if (commsSession != null) { - try { - commsSession.close(); - } catch (final IOException ioe) { + } catch (final Throwable t) { + if (commsSession != null) { + try { + commsSession.close(); + } catch (final IOException ioe) { + } + } + + throw t; + } finally { + if (!addBack.isEmpty()) { + connectionQueue.addAll(addBack); + addBack.clear(); } } - throw t; - } finally { - if (!addBack.isEmpty()) { - connectionQueue.addAll(addBack); - } - } + } while (connection == null || codec == null || commsSession == null || protocol == null); activeConnections.add(connection); return connection; @@ -773,7 +775,7 @@ public class EndpointConnectionPool { final StringBuilder distributionDescription = new StringBuilder(); distributionDescription.append("New Weighted Distribution of Nodes:"); for (final Map.Entry entry : entryCountMap.entrySet()) { - final double percentage = entry.getValue() * 100D / (double) destinations.size(); + final double percentage = entry.getValue() * 100D / destinations.size(); distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data"); } logger.info(distributionDescription.toString());