NIFI-13031 Changed PG StatusSnapshotDTO to use cloned copy for accurate status

This closes #8946

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nissim Shiman 2024-05-31 17:49:22 +00:00 committed by exceptionfactory
parent 31e5141349
commit 22f700f476
No known key found for this signature in database
8 changed files with 41 additions and 9 deletions

View File

@ -321,6 +321,7 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
other.setQueuedSize(getQueuedSize()); other.setQueuedSize(getQueuedSize());
other.setPercentUseBytes(getPercentUseBytes()); other.setPercentUseBytes(getPercentUseBytes());
other.setPercentUseCount(getPercentUseCount()); other.setPercentUseCount(getPercentUseCount());
other.setFlowFileAvailability(getFlowFileAvailability());
return other; return other;
} }

View File

@ -563,11 +563,32 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable {
other.setProcessingNanos(getProcessingNanos()); other.setProcessingNanos(getProcessingNanos());
other.setConnectionStatusSnapshots(copy(getConnectionStatusSnapshots())); if (connectionStatusSnapshots != null) {
other.setProcessorStatusSnapshots(copy(getProcessorStatusSnapshots())); final List<ConnectionStatusSnapshotEntity> collectionStatusSnapshotEntities = new ArrayList<>();
other.setRemoteProcessGroupStatusSnapshots(copy(getRemoteProcessGroupStatusSnapshots())); for (final ConnectionStatusSnapshotEntity connectionStatusSnapshotEntity : connectionStatusSnapshots) {
other.setInputPortStatusSnapshots(copy(getInputPortStatusSnapshots())); collectionStatusSnapshotEntities.add(connectionStatusSnapshotEntity.clone());
other.setOutputPortStatusSnapshots(copy(getOutputPortStatusSnapshots())); }
other.setConnectionStatusSnapshots(collectionStatusSnapshotEntities);
}
if (processorStatusSnapshots != null) {
final List<ProcessorStatusSnapshotEntity> processorStatusSnapshotEntities = new ArrayList<>();
for (final ProcessorStatusSnapshotEntity processorStatusSnapshotEntity : processorStatusSnapshots) {
processorStatusSnapshotEntities.add(processorStatusSnapshotEntity.clone());
}
other.setProcessorStatusSnapshots(processorStatusSnapshotEntities);
}
if (remoteProcessGroupStatusSnapshots != null) {
final List<RemoteProcessGroupStatusSnapshotEntity> remoteProcessGroupStatusSnapshotEntities = new ArrayList<>();
for (final RemoteProcessGroupStatusSnapshotEntity remoteProcessGroupStatusSnapshotEntity : remoteProcessGroupStatusSnapshots) {
remoteProcessGroupStatusSnapshotEntities.add(remoteProcessGroupStatusSnapshotEntity.clone());
}
other.setRemoteProcessGroupStatusSnapshots(remoteProcessGroupStatusSnapshotEntities);
}
other.setInputPortStatusSnapshots(copyPortStatusSnapshots(inputPortStatusSnapshots));
other.setOutputPortStatusSnapshots(copyPortStatusSnapshots(outputPortStatusSnapshots));
if (processGroupStatusSnapshots != null) { if (processGroupStatusSnapshots != null) {
final List<ProcessGroupStatusSnapshotEntity> childGroups = new ArrayList<>(); final List<ProcessGroupStatusSnapshotEntity> childGroups = new ArrayList<>();
@ -580,11 +601,15 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable {
return other; return other;
} }
private <T> Collection<T> copy(final Collection<T> original) { private Collection<PortStatusSnapshotEntity> copyPortStatusSnapshots(Collection<PortStatusSnapshotEntity> portStatusSnapshots) {
if (original == null) { if (portStatusSnapshots != null) {
return null; final List<PortStatusSnapshotEntity> portStatusSnapshotEntities = new ArrayList<>();
for (final PortStatusSnapshotEntity portStatusSnapshotEntity : portStatusSnapshots) {
portStatusSnapshotEntities.add(portStatusSnapshotEntity.clone());
}
return portStatusSnapshotEntities;
} }
return new ArrayList<T>(original); return null;
} }
} }

View File

@ -67,6 +67,7 @@ public class ConnectionStatisticsSnapshotEntity extends Entity implements Readab
@Override @Override
public ConnectionStatisticsSnapshotEntity clone() { public ConnectionStatisticsSnapshotEntity clone() {
final ConnectionStatisticsSnapshotEntity other = new ConnectionStatisticsSnapshotEntity(); final ConnectionStatisticsSnapshotEntity other = new ConnectionStatisticsSnapshotEntity();
other.setId(this.getId());
other.setCanRead(this.getCanRead()); other.setCanRead(this.getCanRead());
other.setConnectionStatisticsSnapshot(this.getConnectionStatisticsSnapshot().clone()); other.setConnectionStatisticsSnapshot(this.getConnectionStatisticsSnapshot().clone());

View File

@ -66,6 +66,7 @@ public class ConnectionStatusSnapshotEntity extends Entity implements ReadablePe
@Override @Override
public ConnectionStatusSnapshotEntity clone() { public ConnectionStatusSnapshotEntity clone() {
final ConnectionStatusSnapshotEntity other = new ConnectionStatusSnapshotEntity(); final ConnectionStatusSnapshotEntity other = new ConnectionStatusSnapshotEntity();
other.setId(this.getId());
other.setCanRead(this.getCanRead()); other.setCanRead(this.getCanRead());
other.setConnectionStatusSnapshot(this.getConnectionStatusSnapshot().clone()); other.setConnectionStatusSnapshot(this.getConnectionStatusSnapshot().clone());

View File

@ -67,6 +67,7 @@ public class PortStatusSnapshotEntity extends Entity implements ReadablePermissi
@Override @Override
public PortStatusSnapshotEntity clone() { public PortStatusSnapshotEntity clone() {
final PortStatusSnapshotEntity other = new PortStatusSnapshotEntity(); final PortStatusSnapshotEntity other = new PortStatusSnapshotEntity();
other.setId(this.getId());
other.setCanRead(this.getCanRead()); other.setCanRead(this.getCanRead());
other.setPortStatusSnapshot(this.getPortStatusSnapshot().clone()); other.setPortStatusSnapshot(this.getPortStatusSnapshot().clone());

View File

@ -66,6 +66,7 @@ public class ProcessGroupStatusSnapshotEntity extends Entity implements Readable
@Override @Override
public ProcessGroupStatusSnapshotEntity clone() { public ProcessGroupStatusSnapshotEntity clone() {
final ProcessGroupStatusSnapshotEntity other = new ProcessGroupStatusSnapshotEntity(); final ProcessGroupStatusSnapshotEntity other = new ProcessGroupStatusSnapshotEntity();
other.setId(this.getId());
other.setCanRead(this.getCanRead()); other.setCanRead(this.getCanRead());
other.setProcessGroupStatusSnapshot(this.getProcessGroupStatusSnapshot().clone()); other.setProcessGroupStatusSnapshot(this.getProcessGroupStatusSnapshot().clone());

View File

@ -66,6 +66,7 @@ public class ProcessorStatusSnapshotEntity extends Entity implements ReadablePer
@Override @Override
public ProcessorStatusSnapshotEntity clone() { public ProcessorStatusSnapshotEntity clone() {
final ProcessorStatusSnapshotEntity other = new ProcessorStatusSnapshotEntity(); final ProcessorStatusSnapshotEntity other = new ProcessorStatusSnapshotEntity();
other.setId(this.getId());
other.setCanRead(this.getCanRead()); other.setCanRead(this.getCanRead());
other.setProcessorStatusSnapshot(this.getProcessorStatusSnapshot().clone()); other.setProcessorStatusSnapshot(this.getProcessorStatusSnapshot().clone());

View File

@ -67,6 +67,7 @@ public class RemoteProcessGroupStatusSnapshotEntity extends Entity implements Re
@Override @Override
public RemoteProcessGroupStatusSnapshotEntity clone() { public RemoteProcessGroupStatusSnapshotEntity clone() {
final RemoteProcessGroupStatusSnapshotEntity other = new RemoteProcessGroupStatusSnapshotEntity(); final RemoteProcessGroupStatusSnapshotEntity other = new RemoteProcessGroupStatusSnapshotEntity();
other.setId(this.getId());
other.setCanRead(this.getCanRead()); other.setCanRead(this.getCanRead());
other.setRemoteProcessGroupStatusSnapshot(this.getRemoteProcessGroupStatusSnapshot().clone()); other.setRemoteProcessGroupStatusSnapshot(this.getRemoteProcessGroupStatusSnapshot().clone());