NIFI-1777 Prevent deleting a connection going to a running processor

NIFI-1777 Added unit tests to test processor removal
This closes #357
This commit is contained in:
Pierre Villard 2016-04-15 16:49:47 +02:00 committed by Oleg Zhurakousky
parent 1a57b37dc9
commit f719cbf60c
2 changed files with 77 additions and 0 deletions

View File

@ -392,5 +392,12 @@ public final class StandardConnection implements Connection {
throw new IllegalStateException("Source of Connection (" + source + ") is running");
}
}
final Connectable dest = destination.get();
if (dest.isRunning()) {
if (!ConnectableType.FUNNEL.equals(dest.getConnectableType())) {
throw new IllegalStateException("Destination of Connection (" + dest + ") is running");
}
}
}
}

View File

@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock;
import java.io.File;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@ -45,6 +47,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
@ -574,6 +577,73 @@ public class TestProcessorLifecycle {
fc.shutdown(true);
}
/**
* Test deletion of processor when connected to another
* @throws Exception exception
*/
@Test
public void validateProcessorDeletion() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNodeA.setProperty("P", "hello");
testGroup.addProcessor(testProcNodeA);
ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNodeB.setProperty("P", "hello");
testGroup.addProcessor(testProcNodeB);
Collection<String> relationNames = new ArrayList<String>();
relationNames.add("relation");
Connection connection = fc.createConnection(UUID.randomUUID().toString(), Connection.class.getName(), testProcNodeA, testProcNodeB, relationNames);
testGroup.addConnection(connection);
ProcessScheduler ps = fc.getProcessScheduler();
ps.startProcessor(testProcNodeA);
ps.startProcessor(testProcNodeB);
try {
testGroup.removeProcessor(testProcNodeA);
fail();
} catch (Exception e) {
// should throw exception because processor running
}
try {
testGroup.removeProcessor(testProcNodeB);
fail();
} catch (Exception e) {
// should throw exception because processor running
}
ps.stopProcessor(testProcNodeB);
Thread.sleep(100);
try {
testGroup.removeProcessor(testProcNodeA);
fail();
} catch (Exception e) {
// should throw exception because destination processor running
}
try {
testGroup.removeProcessor(testProcNodeB);
fail();
} catch (Exception e) {
// should throw exception because source processor running
}
ps.stopProcessor(testProcNodeA);
Thread.sleep(100);
testGroup.removeProcessor(testProcNodeA);
testGroup.removeProcessor(testProcNodeB);
testGroup.shutdown();
fc.shutdown(true);
}
/**
* Scenario where onTrigger() is executed with random delay limited to
* 'delayLimit', yet with guaranteed exit from onTrigger().