NIFI-3705:

- Using consistent logic when verifying connection creation. Removed some unecessary checks as verification has been changed to run in cluster and standalone mode.

This closes #1672.
This commit is contained in:
Matt Gilman 2017-04-13 15:58:42 -04:00 committed by Mark Payne
parent 220a870a1e
commit 7e7f9a5deb
1 changed files with 26 additions and 27 deletions

View File

@ -287,14 +287,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceConnectableDTO.getType())) { if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceConnectableDTO.getType())) {
final ProcessGroup sourceParentGroup = locateProcessGroup(flowController, groupId); final ProcessGroup sourceParentGroup = locateProcessGroup(flowController, groupId);
final RemoteProcessGroup remoteProcessGroup = sourceParentGroup.getRemoteProcessGroup(sourceConnectableDTO.getGroupId()); final RemoteProcessGroup remoteProcessGroup = sourceParentGroup.getRemoteProcessGroup(sourceConnectableDTO.getGroupId());
final RemoteGroupPort remoteOutputPort = remoteProcessGroup.getOutputPort(sourceConnectableDTO.getId()); source = remoteProcessGroup.getOutputPort(sourceConnectableDTO.getId());
// ensure the remote port actually exists
if (!remoteOutputPort.getTargetExists()) {
throw new IllegalArgumentException("The specified remote output port does not exist.");
} else {
source = remoteOutputPort;
}
} else { } else {
final ProcessGroup sourceGroup = locateProcessGroup(flowController, sourceConnectableDTO.getGroupId()); final ProcessGroup sourceGroup = locateProcessGroup(flowController, sourceConnectableDTO.getGroupId());
source = sourceGroup.getConnectable(sourceConnectableDTO.getId()); source = sourceGroup.getConnectable(sourceConnectableDTO.getId());
@ -305,19 +298,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationConnectableDTO.getType())) { if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationConnectableDTO.getType())) {
final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId); final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId);
final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(destinationConnectableDTO.getGroupId()); final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(destinationConnectableDTO.getGroupId());
destination = remoteProcessGroup.getInputPort(destinationConnectableDTO.getId());
if (remoteProcessGroup == null) {
throw new IllegalArgumentException("Unable to find the specified remote process group.");
}
final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(destinationConnectableDTO.getId());
// ensure the remote port actually exists
if (!remoteInputPort.getTargetExists()) {
throw new IllegalArgumentException("The specified remote input port does not exist.");
} else {
destination = remoteInputPort;
}
} else { } else {
final ProcessGroup destinationGroup = locateProcessGroup(flowController, destinationConnectableDTO.getGroupId()); final ProcessGroup destinationGroup = locateProcessGroup(flowController, destinationConnectableDTO.getGroupId());
destination = destinationGroup.getConnectable(destinationConnectableDTO.getId()); destination = destinationGroup.getConnectable(destinationConnectableDTO.getId());
@ -391,27 +372,45 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
throw new IllegalArgumentException("Cannot create connection without specifying destination"); throw new IllegalArgumentException("Cannot create connection without specifying destination");
} }
final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDto.getType())) { if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDto.getType())) {
final Connectable sourceConnectable = rootGroup.findRemoteGroupPort(sourceDto.getId()); final ProcessGroup sourceParentGroup = locateProcessGroup(flowController, groupId);
final RemoteProcessGroup remoteProcessGroup = sourceParentGroup.getRemoteProcessGroup(sourceDto.getGroupId());
if (remoteProcessGroup == null) {
throw new IllegalArgumentException("Unable to find the specified remote process group.");
}
final RemoteGroupPort sourceConnectable = remoteProcessGroup.getOutputPort(sourceDto.getId());
if (sourceConnectable == null) { if (sourceConnectable == null) {
throw new IllegalArgumentException("The specified source for the connection does not exist"); throw new IllegalArgumentException("The specified source for the connection does not exist");
} else if (!sourceConnectable.getTargetExists()) {
throw new IllegalArgumentException("The specified remote output port does not exist.");
} }
} else { } else {
final Connectable sourceConnectable = rootGroup.findLocalConnectable(sourceDto.getId()); final ProcessGroup sourceGroup = locateProcessGroup(flowController, sourceDto.getGroupId());
final Connectable sourceConnectable = sourceGroup.getConnectable(sourceDto.getId());
if (sourceConnectable == null) { if (sourceConnectable == null) {
throw new IllegalArgumentException("The specified source for the connection does not exist"); throw new IllegalArgumentException("The specified source for the connection does not exist");
} }
} }
if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDto.getType())) { if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDto.getType())) {
final Connectable destinationConnectable = rootGroup.findRemoteGroupPort(destinationDto.getId()); final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId);
final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(destinationDto.getGroupId());
if (remoteProcessGroup == null) {
throw new IllegalArgumentException("Unable to find the specified remote process group.");
}
final RemoteGroupPort destinationConnectable = remoteProcessGroup.getInputPort(destinationDto.getId());
if (destinationConnectable == null) { if (destinationConnectable == null) {
throw new IllegalArgumentException("The specified destination for the connection does not exist"); throw new IllegalArgumentException("The specified destination for the connection does not exist");
} else if (!destinationConnectable.getTargetExists()) {
throw new IllegalArgumentException("The specified remote input port does not exist.");
} }
} else { } else {
final Connectable destinationConnectable = rootGroup.findLocalConnectable(destinationDto.getId()); final ProcessGroup destinationGroup = locateProcessGroup(flowController, destinationDto.getGroupId());
final Connectable destinationConnectable = destinationGroup.getConnectable(destinationDto.getId());
if (destinationConnectable == null) { if (destinationConnectable == null) {
throw new IllegalArgumentException("The specified destination for the connection does not exist"); throw new IllegalArgumentException("The specified destination for the connection does not exist");
} }