NIFI-10052 Avoid obtaining any locks when creating/sending heartbeats (#6298)

This commit is contained in:
Hsin-Ying Lee 2022-08-18 02:05:55 +08:00 committed by GitHub
parent 5df6efa0f2
commit 2685856c62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 36 additions and 32 deletions

View File

@ -182,8 +182,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, Port> inputPorts = new HashMap<>();
private final Map<String, Port> outputPorts = new HashMap<>();
private final Map<String, Connection> connections = new HashMap<>();
private final Map<String, ProcessGroup> processGroups = new HashMap<>();
private final Map<String, Connection> connections = new ConcurrentHashMap<>();
private final Map<String, ProcessGroup> processGroups = new ConcurrentHashMap<>();
private final Map<String, Label> labels = new HashMap<>();
private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
private final Map<String, ProcessorNode> processors = new HashMap<>();
@ -4255,6 +4255,24 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
@Override
public QueueSize getQueueSize() {
int count = 0;
long contentSize = 0L;
for (final ProcessGroup childGroup : processGroups.values()) {
final QueueSize queueSize = childGroup.getQueueSize();
count += queueSize.getObjectCount();
contentSize += queueSize.getByteCount();
}
for (final Connection connection : connections.values()) {
final QueueSize queueSize = connection.getFlowFileQueue().size();
count += queueSize.getObjectCount();
contentSize += queueSize.getByteCount();
}
return new QueueSize(count, contentSize);
}
@Override
public String getDefaultBackPressureDataSizeThreshold() {
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.

View File

@ -32,6 +32,7 @@ import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flowfile.FlowFile;
@ -1227,4 +1228,9 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param defaultBackPressureDataSizeThreshold new default back pressure size threshold (must include size unit label)
*/
void setDefaultBackPressureDataSizeThreshold(String defaultBackPressureDataSizeThreshold);
/**
* @return the QueueSize of this Process Group and all child Process Groups
*/
QueueSize getQueueSize();
}

View File

@ -373,7 +373,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
private boolean clustered;
// guarded by rwLock
private NodeConnectionStatus connectionStatus;
private volatile NodeConnectionStatus connectionStatus;
private StatusAnalyticsEngine analyticsEngine;
@ -2136,27 +2136,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return resetValue;
}
//
// Access to controller status
//
public QueueSize getTotalFlowFileCount(final ProcessGroup group) {
int count = 0;
long contentSize = 0L;
for (final Connection connection : group.getConnections()) {
final QueueSize size = connection.getFlowFileQueue().size();
count += size.getObjectCount();
contentSize += size.getByteCount();
}
for (final ProcessGroup childGroup : group.getProcessGroups()) {
final QueueSize size = getTotalFlowFileCount(childGroup);
count += size.getObjectCount();
contentSize += size.getByteCount();
}
return new QueueSize(count, contentSize);
}
public class GroupStatusCounts {
private int queuedCount = 0;
private long queuedContentSize = 0;
@ -3031,12 +3010,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
try {
HeartbeatBean bean = heartbeatBeanRef.get();
if (bean == null) {
readLock.lock();
try {
bean = new HeartbeatBean(flowManager.getRootGroup(), isPrimary());
} finally {
readLock.unlock("createHeartbeatMessage");
}
}
// create heartbeat payload
@ -3045,7 +3019,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
hbPayload.setActiveThreadCount(getActiveThreadCount());
hbPayload.setRevisionUpdateCount(revisionManager.getRevisionUpdateCount());
final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
final QueueSize queueSize = bean.getRootGroup().getQueueSize();
hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
hbPayload.setClusterStatus(clusterCoordinator.getConnectionStatuses());

View File

@ -32,6 +32,7 @@ import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.groups.BatchCounts;
@ -853,6 +854,11 @@ public class MockProcessGroup implements ProcessGroup {
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
}
@Override
public QueueSize getQueueSize() {
return null;
}
@Override
public void terminateProcessor(ProcessorNode processor) {
}

View File

@ -68,7 +68,7 @@ public class NaiveRevisionManager implements RevisionManager {
}
@Override
public synchronized long getRevisionUpdateCount() {
public long getRevisionUpdateCount() {
return revisionUpdateCounter.get();
}