NIFI-427: Allow Process Groups to be deleted as long as no incoming or outgoing connections (unless they are also selected), all components stopped, no FlowFiles queued

This commit is contained in:
Mark Payne 2015-06-05 12:59:15 -04:00
parent 8d1536ed24
commit 5c1afc0897
2 changed files with 75 additions and 24 deletions

View File

@ -641,6 +641,17 @@ public interface ProcessGroup {
void verifyCanDelete(); void verifyCanDelete();
/**
* Ensures that the ProcessGroup is eligible to be deleted.
*
* @param ignorePortConnections if true, the Connections that are currently connected to Ports
* will be ignored. Otherwise, the ProcessGroup is not eligible for deletion if its input ports
* or output ports have any connections
*
* @throws IllegalStateException if the ProcessGroup is not eligible for deletion
*/
void verifyCanDelete(boolean ignorePortConnections);
void verifyCanStart(); void verifyCanStart();
void verifyCanStop(); void verifyCanStop();

View File

@ -243,7 +243,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
return new ProcessGroupCounts(inputPortCount, outputPortCount, running, stopped, return new ProcessGroupCounts(inputPortCount, outputPortCount, running, stopped,
invalid, disabled, activeRemotePorts, inactiveRemotePorts); invalid, disabled, activeRemotePorts, inactiveRemotePorts);
} }
@Override @Override
@ -548,9 +548,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override @Override
public void removeProcessGroup(final ProcessGroup group) { public void removeProcessGroup(final ProcessGroup group) {
if (!requireNonNull(group).isEmpty()) { requireNonNull(group).verifyCanDelete();
throw new IllegalStateException("Cannot remove " + group + " because it is not empty");
}
writeLock.lock(); writeLock.lock();
try { try {
@ -558,7 +556,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (toRemove == null) { if (toRemove == null) {
throw new IllegalStateException(group + " is not a member of this Process Group"); throw new IllegalStateException(group + " is not a member of this Process Group");
} }
verifyCanRemove(toRemove); toRemove.verifyCanDelete();
processGroups.remove(group.getIdentifier()); processGroups.remove(group.getIdentifier());
@ -568,12 +566,6 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
} }
private void verifyCanRemove(final ProcessGroup childGroup) {
if (!childGroup.isEmpty()) {
throw new IllegalStateException("Cannot remove ProcessGroup because it is not empty");
}
}
@Override @Override
public void addRemoteProcessGroup(final RemoteProcessGroup remoteGroup) { public void addRemoteProcessGroup(final RemoteProcessGroup remoteGroup) {
writeLock.lock(); writeLock.lock();
@ -801,11 +793,11 @@ public final class StandardProcessGroup implements ProcessGroup {
} else if (isInputPort(destination)) { } else if (isInputPort(destination)) {
if (!processGroups.containsKey(destinationGroup.getIdentifier())) { if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input " throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input "
+ "Port but the Input Port does not belong to a child Process Group"); + "Port but the Input Port does not belong to a child Process Group");
} }
} else if (destinationGroup != this) { } else if (destinationGroup != this) {
throw new IllegalStateException("Cannot add Connection between " + source + " and " + destination throw new IllegalStateException("Cannot add Connection between " + source + " and " + destination
+ " because they are in different Process Groups and neither is an Input Port or Output Port"); + " because they are in different Process Groups and neither is an Input Port or Output Port");
} }
} }
@ -968,7 +960,7 @@ public final class StandardProcessGroup implements ProcessGroup {
readLock.lock(); readLock.lock();
try { try {
return inputPorts.isEmpty() && outputPorts.isEmpty() && connections.isEmpty() return inputPorts.isEmpty() && outputPorts.isEmpty() && connections.isEmpty()
&& processGroups.isEmpty() && labels.isEmpty() && processors.isEmpty() && remoteGroups.isEmpty(); && processGroups.isEmpty() && labels.isEmpty() && processors.isEmpty() && remoteGroups.isEmpty();
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -1638,7 +1630,7 @@ public final class StandardProcessGroup implements ProcessGroup {
for (final Connection conn : connectable.getIncomingConnections()) { for (final Connection conn : connectable.getIncomingConnections()) {
if (!connectionIds.contains(conn.getIdentifier()) && !connectables.contains(conn.getSource())) { if (!connectionIds.contains(conn.getIdentifier()) && !connectables.contains(conn.getSource())) {
throw new IllegalStateException(connectable + " cannot be removed because it has incoming connections " throw new IllegalStateException(connectable + " cannot be removed because it has incoming connections "
+ "that are not selected to be deleted"); + "that are not selected to be deleted");
} }
} }
} }
@ -1646,9 +1638,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// verify that all of the ProcessGroups in the snippet are empty // verify that all of the ProcessGroups in the snippet are empty
for (final String groupId : snippet.getProcessGroups()) { for (final String groupId : snippet.getProcessGroups()) {
final ProcessGroup toRemove = getProcessGroup(groupId); final ProcessGroup toRemove = getProcessGroup(groupId);
if (!toRemove.isEmpty()) { toRemove.verifyCanDelete(true);
throw new IllegalStateException("Process Group with name " + toRemove.getName() + " cannot be removed because it is not empty");
}
} }
for (final String id : connectionIdsToRemove) { for (final String id : connectionIdsToRemove) {
@ -1666,15 +1656,15 @@ public final class StandardProcessGroup implements ProcessGroup {
for (final String id : replaceNullWithEmptySet(snippet.getLabels())) { for (final String id : replaceNullWithEmptySet(snippet.getLabels())) {
removeLabel(labels.get(id)); removeLabel(labels.get(id));
} }
for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) {
removeProcessGroup(processGroups.get(id));
}
for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) { for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) {
removeProcessor(processors.get(id)); removeProcessor(processors.get(id));
} }
for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) { for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) {
removeRemoteProcessGroup(remoteGroups.get(id)); removeRemoteProcessGroup(remoteGroups.get(id));
} }
for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) {
removeProcessGroup(processGroups.get(id));
}
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -1850,8 +1840,58 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override @Override
public void verifyCanDelete() { public void verifyCanDelete() {
if (!isEmpty()) { verifyCanDelete(false);
throw new IllegalStateException(this + " is not empty"); }
@Override
public void verifyCanDelete(final boolean ignoreConnections) {
readLock.lock();
try {
for (final Port port : inputPorts.values()) {
port.verifyCanDelete(true);
}
for (final Port port : outputPorts.values()) {
port.verifyCanDelete(true);
}
for (final ProcessorNode procNode : processors.values()) {
procNode.verifyCanDelete(true);
}
for (final Connection connection : connections.values()) {
connection.verifyCanDelete();
}
for (final ProcessGroup childGroup : processGroups.values()) {
childGroup.verifyCanDelete();
}
if (!ignoreConnections) {
for (final Port port : inputPorts.values()) {
for (final Connection connection : port.getIncomingConnections()) {
if (connection.getSource().equals(port)) {
connection.verifyCanDelete();
} else {
throw new IllegalStateException("Cannot delete Process Group because Input Port " + port +
" has at least one incoming connection from a component outside of the Process Group. Delete this connection first.");
}
}
}
for (final Port port : outputPorts.values()) {
for (final Connection connection : port.getConnections()) {
if (connection.getDestination().equals(port)) {
connection.verifyCanDelete();
} else {
throw new IllegalStateException("Cannot delete Process Group because Output Port " + port +
" has at least one outgoing connection to a component outside of the Process Group. Delete this connection first.");
}
}
}
}
} finally {
readLock.unlock();
} }
} }
@ -1936,7 +1976,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (group == null) { if (group == null) {
throw new IllegalStateException("Snippet references Process Group with ID " + id + ", which does not exist in this ProcessGroup"); throw new IllegalStateException("Snippet references Process Group with ID " + id + ", which does not exist in this ProcessGroup");
} }
group.verifyCanDelete(); group.verifyCanDelete(true);
} }
for (final String id : snippet.getProcessors()) { for (final String id : snippet.getProcessors()) {