From 5c1afc0897a939bcde803c07216be91b6c4d0c03 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 5 Jun 2015 12:59:15 -0400 Subject: [PATCH] NIFI-427: Allow Process Groups to be deleted as long as no incoming or outgoing connections (unless they are also selected), all components stopped, no FlowFiles queued --- .../org/apache/nifi/groups/ProcessGroup.java | 11 +++ .../nifi/groups/StandardProcessGroup.java | 88 ++++++++++++++----- 2 files changed, 75 insertions(+), 24 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index a9cfb58599..0e2a3f9099 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -641,6 +641,17 @@ public interface ProcessGroup { void verifyCanDelete(); + /** + * Ensures that the ProcessGroup is eligible to be deleted. + * + * @param ignorePortConnections if true, the Connections that are currently connected to Ports + * will be ignored. Otherwise, the ProcessGroup is not eligible for deletion if its input ports + * or output ports have any connections + * + * @throws IllegalStateException if the ProcessGroup is not eligible for deletion + */ + void verifyCanDelete(boolean ignorePortConnections); + void verifyCanStart(); void verifyCanStop(); diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 9fbfb5d0c4..07e3e91f01 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -243,7 +243,7 @@ public final class StandardProcessGroup implements ProcessGroup { } return new ProcessGroupCounts(inputPortCount, outputPortCount, running, stopped, - invalid, disabled, activeRemotePorts, inactiveRemotePorts); + invalid, disabled, activeRemotePorts, inactiveRemotePorts); } @Override @@ -548,9 +548,7 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public void removeProcessGroup(final ProcessGroup group) { - if (!requireNonNull(group).isEmpty()) { - throw new IllegalStateException("Cannot remove " + group + " because it is not empty"); - } + requireNonNull(group).verifyCanDelete(); writeLock.lock(); try { @@ -558,7 +556,7 @@ public final class StandardProcessGroup implements ProcessGroup { if (toRemove == null) { throw new IllegalStateException(group + " is not a member of this Process Group"); } - verifyCanRemove(toRemove); + toRemove.verifyCanDelete(); processGroups.remove(group.getIdentifier()); @@ -568,12 +566,6 @@ public final class StandardProcessGroup implements ProcessGroup { } } - private void verifyCanRemove(final ProcessGroup childGroup) { - if (!childGroup.isEmpty()) { - throw new IllegalStateException("Cannot remove ProcessGroup because it is not empty"); - } - } - @Override public void addRemoteProcessGroup(final RemoteProcessGroup remoteGroup) { writeLock.lock(); @@ -801,11 +793,11 @@ public final class StandardProcessGroup implements ProcessGroup { } else if (isInputPort(destination)) { if (!processGroups.containsKey(destinationGroup.getIdentifier())) { throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input " - + "Port but the Input Port does not belong to a child Process Group"); + + "Port but the Input Port does not belong to a child Process Group"); } } else if (destinationGroup != this) { throw new IllegalStateException("Cannot add Connection between " + source + " and " + destination - + " because they are in different Process Groups and neither is an Input Port or Output Port"); + + " because they are in different Process Groups and neither is an Input Port or Output Port"); } } @@ -968,7 +960,7 @@ public final class StandardProcessGroup implements ProcessGroup { readLock.lock(); try { return inputPorts.isEmpty() && outputPorts.isEmpty() && connections.isEmpty() - && processGroups.isEmpty() && labels.isEmpty() && processors.isEmpty() && remoteGroups.isEmpty(); + && processGroups.isEmpty() && labels.isEmpty() && processors.isEmpty() && remoteGroups.isEmpty(); } finally { readLock.unlock(); } @@ -1638,7 +1630,7 @@ public final class StandardProcessGroup implements ProcessGroup { for (final Connection conn : connectable.getIncomingConnections()) { if (!connectionIds.contains(conn.getIdentifier()) && !connectables.contains(conn.getSource())) { throw new IllegalStateException(connectable + " cannot be removed because it has incoming connections " - + "that are not selected to be deleted"); + + "that are not selected to be deleted"); } } } @@ -1646,9 +1638,7 @@ public final class StandardProcessGroup implements ProcessGroup { // verify that all of the ProcessGroups in the snippet are empty for (final String groupId : snippet.getProcessGroups()) { final ProcessGroup toRemove = getProcessGroup(groupId); - if (!toRemove.isEmpty()) { - throw new IllegalStateException("Process Group with name " + toRemove.getName() + " cannot be removed because it is not empty"); - } + toRemove.verifyCanDelete(true); } for (final String id : connectionIdsToRemove) { @@ -1666,15 +1656,15 @@ public final class StandardProcessGroup implements ProcessGroup { for (final String id : replaceNullWithEmptySet(snippet.getLabels())) { removeLabel(labels.get(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) { - removeProcessGroup(processGroups.get(id)); - } for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) { removeProcessor(processors.get(id)); } for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) { removeRemoteProcessGroup(remoteGroups.get(id)); } + for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) { + removeProcessGroup(processGroups.get(id)); + } } finally { writeLock.unlock(); } @@ -1850,8 +1840,58 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public void verifyCanDelete() { - if (!isEmpty()) { - throw new IllegalStateException(this + " is not empty"); + verifyCanDelete(false); + } + + @Override + public void verifyCanDelete(final boolean ignoreConnections) { + readLock.lock(); + try { + for (final Port port : inputPorts.values()) { + port.verifyCanDelete(true); + } + + for (final Port port : outputPorts.values()) { + port.verifyCanDelete(true); + } + + for (final ProcessorNode procNode : processors.values()) { + procNode.verifyCanDelete(true); + } + + for (final Connection connection : connections.values()) { + connection.verifyCanDelete(); + } + + for (final ProcessGroup childGroup : processGroups.values()) { + childGroup.verifyCanDelete(); + } + + if (!ignoreConnections) { + for (final Port port : inputPorts.values()) { + for (final Connection connection : port.getIncomingConnections()) { + if (connection.getSource().equals(port)) { + connection.verifyCanDelete(); + } else { + throw new IllegalStateException("Cannot delete Process Group because Input Port " + port + + " has at least one incoming connection from a component outside of the Process Group. Delete this connection first."); + } + } + } + + for (final Port port : outputPorts.values()) { + for (final Connection connection : port.getConnections()) { + if (connection.getDestination().equals(port)) { + connection.verifyCanDelete(); + } else { + throw new IllegalStateException("Cannot delete Process Group because Output Port " + port + + " has at least one outgoing connection to a component outside of the Process Group. Delete this connection first."); + } + } + } + } + } finally { + readLock.unlock(); } } @@ -1936,7 +1976,7 @@ public final class StandardProcessGroup implements ProcessGroup { if (group == null) { throw new IllegalStateException("Snippet references Process Group with ID " + id + ", which does not exist in this ProcessGroup"); } - group.verifyCanDelete(); + group.verifyCanDelete(true); } for (final String id : snippet.getProcessors()) {