NIFI-10148: Fixed bug in SwappablePriorityQueue, in which the Active Queue Size could be decremented before the Unacknowledged Queue Size is incremented when FlowFiles are polled. This can result in the SwappablePrioriotyQueue incorrectly returning a value of true from the isEmpty() method. Additionally, updated the NiFiSystemIT so that if waiting for a particular queue size we periodically log size of all queues, which aids in debugging, and added necessary methods to FlowClient to make that happen.

This closes #6143

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2022-06-21 12:45:53 -04:00 committed by exceptionfactory
parent b60b9b8e29
commit ac8c1b0326
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
6 changed files with 157 additions and 24 deletions

View File

@ -175,7 +175,7 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
@Override
public boolean isEmpty() {
return queue.getFlowFileQueueSize().isEmpty();
return queue.isEmpty();
}
@Override

View File

@ -497,7 +497,7 @@ public class SwappablePriorityQueue {
public void acknowledge(final FlowFileRecord flowFile) {
logger.trace("{} Acknowledging {}", this, flowFile);
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
directlyIncrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
}
public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
@ -508,7 +508,7 @@ public class SwappablePriorityQueue {
}
final long totalSize = flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum();
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
directlyIncrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
}
@ -570,7 +570,7 @@ public class SwappablePriorityQueue {
if (flowFile != null) {
logger.trace("{} poll() returning {}", this, flowFile);
incrementUnacknowledgedQueueSize(1, flowFile.getSize());
unacknowledge(1, flowFile.getSize());
}
return flowFile;
@ -604,10 +604,6 @@ public class SwappablePriorityQueue {
flowFile = null;
break;
}
if (flowFile != null) {
incrementActiveQueueSize(-1, -flowFile.getSize());
}
} while (isExpired);
if (!expiredRecords.isEmpty()) {
@ -648,6 +644,8 @@ public class SwappablePriorityQueue {
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
long bytesPulled = 0L;
int flowFilesPulled = 0;
long bytesExpired = 0L;
int flowFilesExpired = 0;
writeLock.lock();
try {
@ -665,8 +663,8 @@ public class SwappablePriorityQueue {
final boolean isExpired = isExpired(flowFile, expirationMillis);
if (isExpired) {
expiredRecords.add(flowFile);
bytesPulled += flowFile.getSize();
flowFilesPulled++;
bytesExpired += flowFile.getSize();
flowFilesExpired++;
if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
break;
@ -683,7 +681,6 @@ public class SwappablePriorityQueue {
bytesPulled += flowFile.getSize();
flowFilesPulled++;
incrementUnacknowledgedQueueSize(1, flowFile.getSize());
selectedFlowFiles.add(flowFile);
} else {
unselected.add(flowFile);
@ -695,7 +692,11 @@ public class SwappablePriorityQueue {
}
this.activeQueue.addAll(unselected);
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
unacknowledge(flowFilesPulled, bytesPulled);
if (flowFilesExpired > 0) {
incrementActiveQueueSize(-flowFilesExpired, -bytesExpired);
}
if (!selectedFlowFiles.isEmpty() && logger.isTraceEnabled()) {
for (final FlowFileRecord flowFile : selectedFlowFiles) {
@ -719,8 +720,10 @@ public class SwappablePriorityQueue {
expiredBytes += record.getSize();
}
incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained);
incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes);
unacknowledge(records.size(), bytesDrained);
if (!expiredRecords.isEmpty()) {
incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
}
}
@ -1095,7 +1098,32 @@ public class SwappablePriorityQueue {
}
}
private void incrementUnacknowledgedQueueSize(final int count, final long bytes) {
/**
* Increments the unacknowledged queue size and decrements the active queue size by the given values.
* This logic is extracted into a separate method because it is critical that we always increment the unacknowledged queue size
* before decrementing the active queue size. Failure to do so could result in the queue temporarily reporting a size of 0/empty
* when the queue is not empty (because we could decrement active queue size to 0 before incrementing unacknowledged queue size to 1).
* To help ensure that we don't break that ordering, we encapsulate the logic into a single method that is responsible for performing
* these steps, and we also used the name 'directlyIncrementUnacknowledgedQueueSize' for the method that updates the unacknowledged queue size, rather than simply
* 'incrementUnacknowledgedQueueSize' to give a hint to any callers that caution must be taken when calling the method.
*
* @param count the number of FlowFiles to increase the unacknowledged count by and decrement active count by
* @param bytes the bytes to increase the unacknowledged count by and decrement the active count by
*/
private void unacknowledge(final int count, final long bytes) {
directlyIncrementUnacknowledgedQueueSize(count, bytes);
incrementActiveQueueSize(-count, -bytes);
}
/**
* Increments the Unacknowledged Queue Size by the given arguments. Note that when data is polled, we need to both increment the unacknowledged size
* AND decrement the active size. But it is crucial that we perform these actions in the proper order. The Unacknowledged size must be incremented before
* the active queue size is decremented. For this purpose, use {@link #unacknowledge(int, long)} instead of using this method directly.
*
* @param count the number of FlowFiles to increase the unacknowledged count by and decrement active count by
* @param bytes the bytes to increase the unacknowledged count by and decrement the active count by
*/
private void directlyIncrementUnacknowledgedQueueSize(final int count, final long bytes) {
boolean updated = false;
while (!updated) {
final FlowFileQueueSize original = size.get();

View File

@ -32,6 +32,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
@ -189,13 +190,7 @@ public class TestStandardFlowFileQueue {
Thread.sleep(100L);
final FlowFileFilter filter = new FlowFileFilter() {
@Override
public FlowFileFilterResult filter(final FlowFile flowFile) {
return FlowFileFilterResult.REJECT_AND_CONTINUE;
}
};
final FlowFileFilter filter = flowFile -> FlowFileFilterResult.REJECT_AND_CONTINUE;
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final List<FlowFileRecord> polled = queue.poll(filter, expiredRecords);

View File

@ -21,11 +21,16 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.junit.jupiter.api.AfterEach;
import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
@ -35,8 +40,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -52,6 +59,10 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
private static final Logger logger = LoggerFactory.getLogger(NiFiSystemIT.class);
private final ConcurrentMap<String, Long> lastLogTimestamps = new ConcurrentHashMap<>();
private static final String QUEUE_SIZE_LOGGING_KEY = "Queue Sizes";
// Group ID | Source Name | Dest Name | Conn Name | Queue Size |
private static final String QUEUE_SIZES_FORMAT = "| %1$-36.36s | %2$-30.30s | %3$-30.30s | %4$-30.30s | %5$-30.30s |";
public static final int CLIENT_API_PORT = 5671;
public static final int CLIENT_API_BASE_PORT = 5670;
public static final String NIFI_GROUP_ID = "org.apache.nifi";
@ -357,11 +368,89 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
final String sourceName = statusEntity.getConnectionStatus().getSourceName();
final String destinationName = statusEntity.getConnectionStatus().getDestinationName();
logEverySecond("Current Queue Size for Connection from {} to {} = {}, Waiting for {}", sourceName, destinationName, currentSize, queueSizeDescription);
logQueueSizesEveryMinute();
return test.test(currentSize);
final boolean matched = test.test(currentSize);
if (matched) {
resetQueueSizeLogging();
}
return matched;
});
}
private void resetQueueSizeLogging() {
lastLogTimestamps.remove(QUEUE_SIZE_LOGGING_KEY);
}
private void logQueueSizesEveryMinute() {
// If we haven't yet logged queue sizes, add entry
final Long lastLogTime = lastLogTimestamps.get(QUEUE_SIZE_LOGGING_KEY);
if (lastLogTime == null) {
lastLogTimestamps.put(QUEUE_SIZE_LOGGING_KEY, System.currentTimeMillis());
return;
}
// If it's not been at least 10 seconds, don't log again
if (System.currentTimeMillis() < lastLogTime + TimeUnit.SECONDS.toMillis(10)) {
return;
}
// Record the current time and log
lastLogTimestamps.put(QUEUE_SIZE_LOGGING_KEY, System.currentTimeMillis());
try {
logQueueSizes();
} catch (final Exception e) {
logger.warn("Attempted to obtain queue sizes for logging purposes but failed to obtain queue sizes", e);
}
}
private void logQueueSizes() throws NiFiClientException, IOException {
final ProcessGroupStatusEntity groupStatusEntity = getNifiClient().getFlowClient().getProcessGroupStatus("root", true);
final ProcessGroupStatusSnapshotDTO groupStatusDto = groupStatusEntity.getProcessGroupStatus().getAggregateSnapshot();
final List<ConnectionStatusSnapshotEntity> connectionStatuses = new ArrayList<>();
gatherConnectionStatuses(groupStatusDto, connectionStatuses);
logger.info("Dump of Queue Sizes:");
final String headerLine = String.format(QUEUE_SIZES_FORMAT,
"Group ID",
"Source Name",
"Destination Name",
"Connection Name",
"Queued");
logger.info(headerLine);
for (final ConnectionStatusSnapshotEntity connectionStatus : connectionStatuses) {
final ConnectionStatusSnapshotDTO statusSnapshotDto = connectionStatus.getConnectionStatusSnapshot();
if (statusSnapshotDto == null) {
continue;
}
final String formatted = String.format(QUEUE_SIZES_FORMAT,
statusSnapshotDto.getGroupId(),
statusSnapshotDto.getSourceName(),
statusSnapshotDto.getDestinationName(),
statusSnapshotDto.getName(),
statusSnapshotDto.getQueued());
logger.info(formatted);
}
}
private void gatherConnectionStatuses(final ProcessGroupStatusSnapshotDTO groupStatusSnapshotDto, final List<ConnectionStatusSnapshotEntity> connectionStatuses) {
if (groupStatusSnapshotDto == null) {
return;
}
connectionStatuses.addAll(groupStatusSnapshotDto.getConnectionStatusSnapshots());
for (final ProcessGroupStatusSnapshotEntity childStatusEntity : groupStatusSnapshotDto.getProcessGroupStatusSnapshots()) {
gatherConnectionStatuses(childStatusEntity.getProcessGroupStatusSnapshot(), connectionStatuses);
}
}
private void logEverySecond(final String message, final Object... args) {
final Long lastLogTime = lastLogTimestamps.get(message);
if (lastLogTime == null || lastLogTime < System.currentTimeMillis() - 1000L) {

View File

@ -22,6 +22,7 @@ import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CurrentUserEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ReportingTasksEntity;
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
import org.apache.nifi.web.api.entity.TemplatesEntity;
@ -135,4 +136,12 @@ public interface FlowClient {
* @return the status for the connection
*/
ConnectionStatusEntity getConnectionStatus(String connectionId, boolean nodewise) throws NiFiClientException, IOException;
/**
* Retrieves the status for the process group with the given ID
* @param groupId the id of the process group
* @param recursive whether or not to recurse into sub groups
* @return the status for the process group
*/
ProcessGroupStatusEntity getProcessGroupStatus(String groupId, boolean recursive) throws NiFiClientException, IOException;
}

View File

@ -31,6 +31,7 @@ import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CurrentUserEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ReportingTasksEntity;
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
import org.apache.nifi.web.api.entity.TemplatesEntity;
@ -263,4 +264,15 @@ public class JerseyFlowClient extends AbstractJerseyClient implements FlowClient
return getRequestBuilder(target).get(ConnectionStatusEntity.class);
});
}
@Override
public ProcessGroupStatusEntity getProcessGroupStatus(final String groupId, final boolean recursive) throws NiFiClientException, IOException {
return executeAction("Error retrieving ProcessGroup status", () -> {
final WebTarget target = flowTarget.path("/process-groups/{groupId}/status")
.resolveTemplate("groupId", groupId)
.queryParam("recursive", recursive);
return getRequestBuilder(target).get(ProcessGroupStatusEntity.class);
});
}
}