NIFI-6696: Ensured that callback to RemoteQueuePartition do not attempt to obtain the Partition Read Lock. The Read Lock is not necessary as long as the 'partitioner' is volatile, because it doesn't matter whether or not the actual partitions themselves change, since the only partition that would be touched is the Rebalancing Partition,on, which is fixed. Obtaining the partition read lock can lead to a deadlock as outlined in the Jira description.

This closes #3760.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2019-09-20 15:05:25 -04:00 committed by Bryan Bende
parent 6d8968cc87
commit c721a9ee5f
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
1 changed files with 17 additions and 36 deletions

View File

@ -114,7 +114,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
private final Lock partitionReadLock = partitionLock.readLock(); private final Lock partitionReadLock = partitionLock.readLock();
private final Lock partitionWriteLock = partitionLock.writeLock(); private final Lock partitionWriteLock = partitionLock.writeLock();
private QueuePartition[] queuePartitions; private QueuePartition[] queuePartitions;
private FlowFilePartitioner partitioner; private volatile FlowFilePartitioner partitioner;
private boolean stopped = true; private boolean stopped = true;
private volatile boolean offloaded = false; private volatile boolean offloaded = false;
@ -337,51 +337,32 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
return; return;
} }
partitionReadLock.lock(); if (isRebalanceOnFailure(partitionerUsed)) {
try { logger.debug("Transferring {} FlowFiles to Rebalancing Partition from node {}", flowFiles.size(), nodeId);
if (isRebalanceOnFailure(partitionerUsed)) { rebalancingPartition.rebalance(flowFiles);
logger.debug("Transferring {} FlowFiles to Rebalancing Partition from node {}", flowFiles.size(), nodeId); } else {
rebalancingPartition.rebalance(flowFiles); logger.debug("Returning {} FlowFiles to their queue for node {} because Partitioner {} indicates that the FlowFiles should stay where they are",
} else { flowFiles.size(), nodeId, partitionerUsed);
logger.debug("Returning {} FlowFiles to their queue for node {} because Partitioner {} indicates that the FlowFiles should stay where they are", flowFiles.size(), nodeId, partitionQueue.putAll(flowFiles);
partitioner);
partitionQueue.putAll(flowFiles);
}
} finally {
partitionReadLock.unlock();
} }
} }
@Override @Override
public void putAll(final Function<String, FlowFileQueueContents> queueContentsFunction, final FlowFilePartitioner partitionerUsed) { public void putAll(final Function<String, FlowFileQueueContents> queueContentsFunction, final FlowFilePartitioner partitionerUsed) {
partitionReadLock.lock(); if (isRebalanceOnFailure(partitionerUsed)) {
try { final FlowFileQueueContents contents = queueContentsFunction.apply(rebalancingPartition.getSwapPartitionName());
if (isRebalanceOnFailure(partitionerUsed)) { rebalancingPartition.rebalance(contents);
final FlowFileQueueContents contents = queueContentsFunction.apply(rebalancingPartition.getSwapPartitionName()); logger.debug("Transferring all {} FlowFiles and {} Swap Files queued for node {} to Rebalancing Partition",
rebalancingPartition.rebalance(contents); contents.getActiveFlowFiles().size(), contents.getSwapLocations().size(), nodeId);
logger.debug("Transferring all {} FlowFiles and {} Swap Files queued for node {} to Rebalancing Partition", } else {
contents.getActiveFlowFiles().size(), contents.getSwapLocations().size(), nodeId); logger.debug("Will not transfer FlowFiles queued for node {} to Rebalancing Partition because Partitioner {} indicates that the FlowFiles should stay where they are",
} else { nodeId, partitionerUsed);
logger.debug("Will not transfer FlowFiles queued for node {} to Rebalancing Partition because Partitioner {} indicates that the FlowFiles should stay where they are", nodeId,
partitioner);
}
} finally {
partitionReadLock.unlock();
} }
} }
@Override @Override
public boolean isRebalanceOnFailure(final FlowFilePartitioner partitionerUsed) { public boolean isRebalanceOnFailure(final FlowFilePartitioner partitionerUsed) {
partitionReadLock.lock(); return partitionerUsed.isRebalanceOnFailure() || !partitionerUsed.equals(partitioner);
try {
if (!partitionerUsed.equals(partitioner)) {
return true;
}
return partitioner.isRebalanceOnFailure();
} finally {
partitionReadLock.unlock();
}
} }
}; };