mirror of https://github.com/apache/nifi.git
NIFI-5663: Ensure that when sort Node Identifiers that we use both the node's API Address as well as API Port, in case 2 nodes are running on same host. Also ensure that when Local Node ID is determined that we update all Load Balancing Partitions, if necessary
This closes #3048. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
768bcfb509
commit
c87d791938
|
@ -571,7 +571,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
||||||
|
|
||||||
// Re-define 'queuePartitions' array
|
// Re-define 'queuePartitions' array
|
||||||
final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(updatedNodeIdentifiers);
|
final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(updatedNodeIdentifiers);
|
||||||
sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress));
|
sortedNodeIdentifiers.sort(Comparator.comparing(nodeId -> nodeId.getApiAddress() + ":" + nodeId.getApiPort()));
|
||||||
|
|
||||||
final QueuePartition[] updatedQueuePartitions;
|
final QueuePartition[] updatedQueuePartitions;
|
||||||
if (sortedNodeIdentifiers.isEmpty()) {
|
if (sortedNodeIdentifiers.isEmpty()) {
|
||||||
|
@ -990,6 +990,14 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!nodeIdentifiers.contains(localNodeId)) {
|
||||||
|
final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers);
|
||||||
|
updatedNodeIds.add(localNodeId);
|
||||||
|
|
||||||
|
logger.debug("Local Node Identifier has now been determined to be {}. Adding to set of Node Identifiers for {}", localNodeId, SocketLoadBalancedFlowFileQueue.this);
|
||||||
|
setNodeIdentifiers(updatedNodeIds, false);
|
||||||
|
}
|
||||||
|
|
||||||
logger.debug("Local Node Identifier set to {}; current partitions = {}", localNodeId, queuePartitions);
|
logger.debug("Local Node Identifier set to {}; current partitions = {}", localNodeId, queuePartitions);
|
||||||
|
|
||||||
for (final QueuePartition partition : queuePartitions) {
|
for (final QueuePartition partition : queuePartitions) {
|
||||||
|
@ -1009,7 +1017,9 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
||||||
logger.debug("{} Local Node Identifier set to {} and found Queue Partition {} with that Node Identifier. Will force update of partitions",
|
logger.debug("{} Local Node Identifier set to {} and found Queue Partition {} with that Node Identifier. Will force update of partitions",
|
||||||
SocketLoadBalancedFlowFileQueue.this, localNodeId, partition);
|
SocketLoadBalancedFlowFileQueue.this, localNodeId, partition);
|
||||||
|
|
||||||
setNodeIdentifiers(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers, true);
|
final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers);
|
||||||
|
updatedNodeIds.add(localNodeId);
|
||||||
|
setNodeIdentifiers(updatedNodeIds, true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue