NIFI-1301: Ensure that when creating site-to-site connection, if remote instance is applying backpressure that we do not block indefinitely waiting for the connection to be made

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2015-12-17 15:40:00 -05:00 committed by joewitt
parent 608287f9fe
commit bef3fc8b40
1 changed files with 33 additions and 31 deletions

View File

@ -250,26 +250,26 @@ public class EndpointConnectionPool {
EndpointConnection connection; EndpointConnection connection;
Peer peer = null; Peer peer = null;
logger.debug("{} getting next peer status", this); do {
final PeerStatus peerStatus = getNextPeerStatus(direction); final List<EndpointConnection> addBack = new ArrayList<>();
logger.debug("{} next peer status = {}", this, peerStatus); logger.debug("{} getting next peer status", this);
if (peerStatus == null) { final PeerStatus peerStatus = getNextPeerStatus(direction);
return null; logger.debug("{} next peer status = {}", this, peerStatus);
} if (peerStatus == null) {
return null;
final PeerDescription peerDescription = peerStatus.getPeerDescription();
BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription);
if (connectionQueue == null) {
connectionQueue = new LinkedBlockingQueue<>();
BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue);
if (existing != null) {
connectionQueue = existing;
} }
}
final List<EndpointConnection> addBack = new ArrayList<>(); final PeerDescription peerDescription = peerStatus.getPeerDescription();
try { BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription);
do { if (connectionQueue == null) {
connectionQueue = new LinkedBlockingQueue<>();
BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue);
if (existing != null) {
connectionQueue = existing;
}
}
try {
connection = connectionQueue.poll(); connection = connectionQueue.poll();
logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection); logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection);
final String portId = getPortIdentifier(direction); final String portId = getPortIdentifier(direction);
@ -387,21 +387,23 @@ public class EndpointConnectionPool {
protocol = connection.getSocketClientProtocol(); protocol = connection.getSocketClientProtocol();
} }
} }
} while (connection == null || codec == null || commsSession == null || protocol == null); } catch (final Throwable t) {
} catch (final Throwable t) { if (commsSession != null) {
if (commsSession != null) { try {
try { commsSession.close();
commsSession.close(); } catch (final IOException ioe) {
} catch (final IOException ioe) { }
}
throw t;
} finally {
if (!addBack.isEmpty()) {
connectionQueue.addAll(addBack);
addBack.clear();
} }
} }
throw t; } while (connection == null || codec == null || commsSession == null || protocol == null);
} finally {
if (!addBack.isEmpty()) {
connectionQueue.addAll(addBack);
}
}
activeConnections.add(connection); activeConnections.add(connection);
return connection; return connection;
@ -773,7 +775,7 @@ public class EndpointConnectionPool {
final StringBuilder distributionDescription = new StringBuilder(); final StringBuilder distributionDescription = new StringBuilder();
distributionDescription.append("New Weighted Distribution of Nodes:"); distributionDescription.append("New Weighted Distribution of Nodes:");
for (final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet()) { for (final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet()) {
final double percentage = entry.getValue() * 100D / (double) destinations.size(); final double percentage = entry.getValue() * 100D / destinations.size();
distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data"); distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
} }
logger.info(distributionDescription.toString()); logger.info(distributionDescription.toString());