NIFI-5142: Do not allow a connection's destination to be changed to a funnel if the source is the same funnel. Also fixed some typos in StandardFunnel. This closes #2669

This commit is contained in:
Mark Payne 2018-05-02 15:01:15 -04:00 committed by Matt Gilman
parent 0289ca7114
commit 18ad348107
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
2 changed files with 7 additions and 3 deletions

View File

@ -190,7 +190,7 @@ public class StandardFunnel implements Funnel {
writeLock.lock();
try {
if (!outgoingConnections.remove(connection)) {
throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Funnel");
}
outgoingConnections.add(connection);
} finally {
@ -202,7 +202,7 @@ public class StandardFunnel implements Funnel {
writeLock.lock();
try {
if (!incomingConnections.remove(connection)) {
throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Funnel");
}
incomingConnections.add(connection);
} finally {
@ -218,7 +218,7 @@ public class StandardFunnel implements Funnel {
if (!requireNonNull(connection).getSource().equals(this)) {
final boolean existed = incomingConnections.remove(connection);
if (!existed) {
throw new IllegalStateException("The given connection is not currently registered for this ProcessorNode");
throw new IllegalStateException("The given connection is not currently registered for this Funnel");
}
return;
}

View File

@ -301,6 +301,10 @@ public final class StandardConnection implements Connection {
throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination);
}
if (newDestination instanceof Funnel && newDestination.equals(source)) {
throw new IllegalStateException("Funnels do not support self-looping connections.");
}
try {
previousDestination.removeConnection(this);
this.destination.set(newDestination);