NIFI-11289: Avoid obtaining read locks on queues when fetching Group Status, except in those few specific situations where it's needed.

This closes #7046

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-03-15 12:32:39 -04:00 committed by exceptionfactory
parent 776f180d15
commit 450a46bc58
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 68 additions and 64 deletions

View File

@ -93,8 +93,9 @@ public abstract class AbstractEventAccess implements EventAccess {
*/
@Override
public ProcessGroupStatus getGroupStatus(final String groupId) {
final RepositoryStatusReport repoStatusReport = generateRepositoryStatusReport();
return getGroupStatus(groupId, repoStatusReport);
final RepositoryStatusReport statusReport = generateRepositoryStatusReport();
final ProcessGroup group = flowManager.getGroup(groupId);
return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1, true);
}
/**
@ -110,7 +111,7 @@ public abstract class AbstractEventAccess implements EventAccess {
final ProcessGroup group = flowManager.getGroup(groupId);
// this was invoked with no user context so the results will be unfiltered... necessary for aggregating status history
return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1);
return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1, false);
}
protected RepositoryStatusReport generateRepositoryStatusReport() {
@ -128,10 +129,11 @@ public abstract class AbstractEventAccess implements EventAccess {
* @param isAuthorized is authorized check
* @param recursiveStatusDepth the number of levels deep we should recurse and still include the the processors' statuses, the groups' statuses, etc. in the returned ProcessGroupStatus
* @param currentDepth the current number of levels deep that we have recursed
* @param includeConnectionDetails whether or not to include the details of the connections that may be expensive to calculate and/or require locks be obtained
* @return the component status
*/
ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport, final Predicate<Authorizable> isAuthorized,
final int recursiveStatusDepth, final int currentDepth) {
final int recursiveStatusDepth, final int currentDepth, final boolean includeConnectionDetails) {
if (group == null) {
return null;
}
@ -186,14 +188,14 @@ public abstract class AbstractEventAccess implements EventAccess {
for (final ProcessGroup childGroup : group.getProcessGroups()) {
final ProcessGroupStatus childGroupStatus;
if (populateChildStatuses) {
childGroupStatus = getGroupStatus(childGroup, statusReport, isAuthorized, recursiveStatusDepth, currentDepth + 1);
childGroupStatus = getGroupStatus(childGroup, statusReport, isAuthorized, recursiveStatusDepth, currentDepth + 1, includeConnectionDetails);
localChildGroupStatusCollection.add(childGroupStatus);
} else {
// In this case, we don't want to include any of the recursive components' individual statuses. As a result, we can
// avoid performing any sort of authorizations. Because we only care about the numbers that come back, we can just indicate
// that the user is not authorized. This allows us to avoid the expense of both performing the authorization and calculating
// things that we would otherwise need to calculate if the user were in fact authorized.
childGroupStatus = getGroupStatus(childGroup, statusReport, authorizable -> false, recursiveStatusDepth, currentDepth + 1);
childGroupStatus = getGroupStatus(childGroup, statusReport, authorizable -> false, recursiveStatusDepth, currentDepth + 1, includeConnectionDetails);
}
activeGroupThreads += childGroupStatus.getActiveThreadCount();
@ -252,9 +254,14 @@ public abstract class AbstractEventAccess implements EventAccess {
connStatus.setDestinationName(isDestinationAuthorized ? conn.getDestination().getName() : conn.getDestination().getIdentifier());
connStatus.setBackPressureDataSizeThreshold(conn.getFlowFileQueue().getBackPressureDataSizeThreshold());
connStatus.setBackPressureObjectThreshold(conn.getFlowFileQueue().getBackPressureObjectThreshold());
connStatus.setTotalQueuedDuration(conn.getFlowFileQueue().getTotalQueuedDuration(now));
long minLastQueueDate = conn.getFlowFileQueue().getMinLastQueueDate();
connStatus.setMaxQueuedDuration(minLastQueueDate == 0 ? 0 : now - minLastQueueDate);
if (includeConnectionDetails) {
connStatus.setTotalQueuedDuration(conn.getFlowFileQueue().getTotalQueuedDuration(now));
long minLastQueueDate = conn.getFlowFileQueue().getMinLastQueueDate();
connStatus.setMaxQueuedDuration(minLastQueueDate == 0 ? 0 : now - minLastQueueDate);
} else {
connStatus.setTotalQueuedDuration(0L);
connStatus.setMaxQueuedDuration(0L);
}
connStatus.setFlowFileAvailability(conn.getFlowFileQueue().getFlowFileAvailability());
final FlowFileEvent connectionStatusReport = statusReport.getReportEntry(conn.getIdentifier());
@ -665,7 +672,11 @@ public abstract class AbstractEventAccess implements EventAccess {
*/
@Override
public ProcessGroupStatus getControllerStatus() {
return getGroupStatus(flowManager.getRootGroupId());
final String rootGroupId = flowManager.getRootGroupId();
final ProcessGroup group = flowManager.getGroup(rootGroupId);
final RepositoryStatusReport statusReport = generateRepositoryStatusReport();
return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1, true);
}
@Override

View File

@ -64,7 +64,7 @@ public class SwappablePriorityQueue {
private final EventReporter eventReporter;
private final FlowFileQueue flowFileQueue;
private final DropFlowFileAction dropAction;
private final List<FlowFilePrioritizer> priorities = new ArrayList<>();
private volatile List<FlowFilePrioritizer> priorities = new ArrayList<>();
private final String swapPartitionName;
private final List<String> swapLocations = new ArrayList<>();
@ -85,6 +85,7 @@ public class SwappablePriorityQueue {
private PriorityQueue<FlowFileRecord> activeQueue;
private ArrayList<FlowFileRecord> swapQueue;
private boolean swapMode = false;
private volatile long topPenaltyExpiration = -1L;
// The following members are used to keep metrics in memory for reporting purposes so that we don't have to constantly
// read these values from swap files on disk.
@ -113,19 +114,13 @@ public class SwappablePriorityQueue {
}
public List<FlowFilePrioritizer> getPriorities() {
readLock.lock();
try {
return Collections.unmodifiableList(priorities);
} finally {
readLock.unlock("getPriorities");
}
return Collections.unmodifiableList(priorities);
}
public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
writeLock.lock();
try {
priorities.clear();
priorities.addAll(newPriorities);
this.priorities = new ArrayList<>(newPriorities);
final PriorityQueue<FlowFileRecord> newQueue = new PriorityQueue<>(Math.max(20, activeQueue.size()), new QueuePrioritizer(newPriorities));
newQueue.addAll(activeQueue);
@ -443,52 +438,12 @@ public class SwappablePriorityQueue {
public FlowFileAvailability getFlowFileAvailability() {
// If queue is empty, avoid obtaining a lock.
final FlowFileQueueSize queueSize = getFlowFileQueueSize();
if (queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0) {
if (isActiveQueueEmpty()) {
return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
}
boolean mustMigrateSwapToActive = false;
FlowFileRecord top;
readLock.lock();
try {
top = activeQueue.peek();
if (top == null) {
if (swapQueue.isEmpty() && queueSize.getSwapFileCount() > 0) {
// Nothing available in the active queue or swap queue, but there is data swapped out.
// We need to trigger that data to be swapped back in. But to do this, we need to hold the write lock.
// Because we cannot obtain the write lock while already holding the read lock, we set a flag so that we
// can migrate swap to active queue only after we've released the read lock.
mustMigrateSwapToActive = true;
} else if (swapQueue.isEmpty()) {
return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
} else {
top = swapQueue.get(0);
}
}
} finally {
readLock.unlock("isFlowFileAvailable");
}
// If we need to migrate swapped data to the active queue, we can do that now that the read lock has been released.
// There may well be multiple threads attempting this concurrently, though, so only use tryLock() and if the lock
// is not obtained, the other thread can swap data in, or the next iteration of #getFlowFileAvailability will.
if (mustMigrateSwapToActive) {
final boolean lockObtained = writeLock.tryLock();
if (lockObtained) {
try {
migrateSwapToActive();
} finally {
writeLock.unlock("getFlowFileAvailability");
}
}
}
if (top == null) {
return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
}
if (top.isPenalized()) {
final long expiration = topPenaltyExpiration;
if (expiration > 0 && expiration > System.currentTimeMillis()) { // compare against 0 to avoid unnecessary System call
return FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED;
}
@ -525,6 +480,7 @@ public class SwappablePriorityQueue {
activeQueue.add(flowFile);
}
updateTopPenaltyExpiration();
logger.trace("{} put to {}", flowFile, this);
} finally {
writeLock.unlock("put(FlowFileRecord)");
@ -550,6 +506,7 @@ public class SwappablePriorityQueue {
activeQueue.addAll(flowFiles);
}
updateTopPenaltyExpiration();
logger.trace("{} put to {}", flowFiles, this);
} finally {
writeLock.unlock("putAll");
@ -573,6 +530,8 @@ public class SwappablePriorityQueue {
unacknowledge(1, flowFile.getSize());
}
updateTopPenaltyExpiration();
return flowFile;
} finally {
writeLock.unlock("poll(Set)");
@ -624,6 +583,7 @@ public class SwappablePriorityQueue {
writeLock.lock();
try {
doPoll(records, maxResults, expiredRecords, expirationMillis, pollStrategy);
updateTopPenaltyExpiration();
} finally {
writeLock.unlock("poll(int, Set)");
}
@ -704,12 +664,25 @@ public class SwappablePriorityQueue {
}
}
updateTopPenaltyExpiration();
return selectedFlowFiles;
} finally {
writeLock.unlock("poll(Filter, Set)");
}
}
// MUST be called while holding read lock or write lock
private void updateTopPenaltyExpiration() {
final FlowFileRecord top = activeQueue.peek();
if (top == null) {
topPenaltyExpiration = -1L;
return;
}
topPenaltyExpiration = top.getPenaltyExpirationMillis();
}
private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
migrateSwapToActive();
@ -997,6 +970,7 @@ public class SwappablePriorityQueue {
incrementSwapQueueSize(swapFlowFileCount, swapByteCount, swapLocations.size());
this.swapLocations.addAll(swapLocations);
updateTopPenaltyExpiration();
} finally {
writeLock.unlock("Recover Swap Files");
}

View File

@ -113,7 +113,7 @@ public class StandardEventAccess extends AbstractEventAccess implements UserAwar
final ProcessGroup group = flowManager.getGroup(groupId);
// on demand status request for a specific user... require authorization per component and filter results as appropriate
return getGroupStatus(group, statusReport, authorizable -> authorizable.isAuthorized(this.authorizer, RequestAction.READ, user), Integer.MAX_VALUE, 1);
return getGroupStatus(group, statusReport, authorizable -> authorizable.isAuthorized(this.authorizer, RequestAction.READ, user), Integer.MAX_VALUE, 1, false);
}
/**
@ -144,6 +144,6 @@ public class StandardEventAccess extends AbstractEventAccess implements UserAwar
final ProcessGroup group = flowManager.getGroup(groupId);
// on demand status request for a specific user... require authorization per component and filter results as appropriate
return getGroupStatus(group, statusReport, authorizable -> authorizable.isAuthorized(this.authorizer, RequestAction.READ, user), recursiveStatusDepth, 1);
return getGroupStatus(group, statusReport, authorizable -> authorizable.isAuthorized(this.authorizer, RequestAction.READ, user), recursiveStatusDepth, 1, false);
}
}

View File

@ -153,9 +153,28 @@ public class TestSocketLoadBalancedFlowFileQueue {
assertFalse(queue.isEmpty());
assertSame(FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED, queue.getFlowFileAvailability());
// Adjust the penalty expiration so that it's not longer penalized.
// This will not change the FlowFile Availability, however, because it has already stored the
// Penalty expiration date elsewhere. To trigger that to change, we need to add something to the queue
// or remove something. We don't want to remove the data yet, so we add a new FlowFile.
penalizedFlowFile.setPenaltyExpiration(System.currentTimeMillis() - 1);
final MockFlowFileRecord readyFlowFile = new MockFlowFileRecord(1);
queue.put(readyFlowFile);
assertFalse(queue.isEmpty());
assertSame(FlowFileAvailability.FLOWFILE_AVAILABLE, queue.getFlowFileAvailability());
assertSame(penalizedFlowFile, queue.poll(Collections.emptySet()));
assertFalse(queue.isEmpty());
assertSame(FlowFileAvailability.FLOWFILE_AVAILABLE, queue.getFlowFileAvailability());
assertSame(readyFlowFile, queue.poll(Collections.emptySet()));
assertTrue(queue.isActiveQueueEmpty());
assertSame(FlowFileAvailability.ACTIVE_QUEUE_EMPTY, queue.getFlowFileAvailability());
queue.acknowledge(penalizedFlowFile);
queue.acknowledge(readyFlowFile);
assertTrue(queue.isEmpty());
}
@Test