mirror of https://github.com/apache/nifi.git
NIFI-5919: Addressed a race condition that can exist if adding FlowFiles to a FlowFileQueue before adjusting the size of the queue to account for the FlowFiles
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #3238.
This commit is contained in:
parent
595a2decc6
commit
fea17d0ca8
|
@ -769,8 +769,26 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
|||
putAll(flowFiles);
|
||||
} else {
|
||||
logger.debug("Received the following FlowFiles from Peer: {}. Will accept FlowFiles to the local partition", flowFiles);
|
||||
localPartition.putAll(flowFiles);
|
||||
|
||||
// As explained in the putAllAndGetPartitions() method, we must ensure that we call adjustSize() before we
|
||||
// put the FlowFiles on the queue. Otherwise, we will encounter a race condition. Specifically, that race condition
|
||||
// can play out like so:
|
||||
//
|
||||
// Thread 1: Call localPartition.putAll() when the queue is empty (has a queue size of 0) but has not yet adjusted the size.
|
||||
// Thread 2: Call poll() to obtain the FlowFile just received.
|
||||
// Thread 2: Transfer the FlowFile to some Relationship
|
||||
// Thread 2: Commit the session, which will call acknowledge on this queue.
|
||||
// Thread 2: The acknowledge() method attempts to decrement the size of the queue to -1.
|
||||
// This causes an Exception to be thrown and the queue size to remain at 0.
|
||||
// However, the FlowFile has already been successfully transferred to the next Queue.
|
||||
// Thread 1: Call adjustSize() to increment the size of the queue to 1 FlowFile.
|
||||
//
|
||||
// In this scenario, we now have no FlowFiles in the queue. However, the queue size is set to 1.
|
||||
// We can avoid this race condition by simply ensuring that we call adjustSize() before making the FlowFiles
|
||||
// available on the queue. This way, we cannot possibly obtain the FlowFiles and process/acknowledge them before the queue
|
||||
// size has been updated to account for them and therefore we will not attempt to assign a negative queue size.
|
||||
adjustSize(flowFiles.size(), flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
|
||||
localPartition.putAll(flowFiles);
|
||||
}
|
||||
} finally {
|
||||
partitionReadLock.unlock();
|
||||
|
|
Loading…
Reference in New Issue