From 61c6f0305bc7b5c68456fc2d40e0536698821c47 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 9 Feb 2018 14:09:23 -0500 Subject: [PATCH] NIFI-4863: Bug fixes to the way that we handled Remote Group Ports when changing flow version / reverting local changes - Everywhere that we ignore adding remote ports we should ignore removing remote ports as well in flow diffs This closes #2462. Signed-off-by: Bryan Bende --- .../nifi/groups/StandardProcessGroup.java | 27 +++++-- .../remote/StandardRemoteProcessGroup.java | 1 + .../nifi/util/FlowDifferenceFilters.java | 8 +- .../nifi/util/TestFlowDifferenceFilters.java | 8 +- .../apache/nifi/web/NiFiServiceFacade.java | 12 +++ .../nifi/web/StandardNiFiServiceFacade.java | 40 ++++++++-- .../apache/nifi/web/api/VersionsResource.java | 74 +++++++++++++------ .../apache/nifi/web/api/dto/DtoFactory.java | 3 +- .../web/dao/impl/StandardProcessGroupDAO.java | 14 +--- .../impl/StandardRemoteProcessGroupDAO.java | 14 ++-- 10 files changed, 138 insertions(+), 63 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 8b7dcd2a8c..ae9b43f345 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -3270,11 +3270,6 @@ public final class StandardProcessGroup implements ProcessGroup { continue; } - // Ignore differences for adding a remote port - if (FlowDifferenceFilters.isAddedRemotePort(diff)) { - continue; - } - // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level // and if so compare our VersionedControllerService to the existing service. if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) { @@ -3909,9 +3904,17 @@ public final class StandardProcessGroup implements ProcessGroup { } final RemoteProcessGroup rpg = rpgOption.get(); - return rpg.getInputPorts().stream() + final Optional portByIdOption = rpg.getInputPorts().stream() .filter(component -> component.getVersionedComponentId().isPresent()) .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny(); + + if (portByIdOption.isPresent()) { + return portByIdOption.get(); + } + + return rpg.getInputPorts().stream() + .filter(component -> connectableComponent.getName().equals(component.getName())) .findAny() .orElse(null); } @@ -3928,9 +3931,17 @@ public final class StandardProcessGroup implements ProcessGroup { } final RemoteProcessGroup rpg = rpgOption.get(); - return rpg.getOutputPorts().stream() + final Optional portByIdOption = rpg.getOutputPorts().stream() .filter(component -> component.getVersionedComponentId().isPresent()) .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny(); + + if (portByIdOption.isPresent()) { + return portByIdOption.get(); + } + + return rpg.getOutputPorts().stream() + .filter(component -> connectableComponent.getName().equals(component.getName())) .findAny() .orElse(null); } @@ -4216,7 +4227,7 @@ public final class StandardProcessGroup implements ProcessGroup { final FlowComparison comparison = flowComparator.compare(); final Set differences = comparison.getDifferences().stream() .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED) - .filter(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS) + .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS) .collect(Collectors.toCollection(HashSet::new)); LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 0b9c6f29bd..c4621e68d2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -672,6 +672,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { if (!StringUtils.isBlank(descriptor.getBatchDuration())) { port.setBatchDuration(descriptor.getBatchDuration()); } + port.setVersionedComponentId(descriptor.getVersionedComponentId()); return port; } finally { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java index ca48b9929e..4d341e97fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java @@ -29,12 +29,12 @@ public class FlowDifferenceFilters { /** * Predicate that returns true if the difference is NOT a remote port being added, and false if it is. */ - public static Predicate FILTER_ADDED_REMOTE_PORTS = (fd) -> { - return !isAddedRemotePort(fd); + public static Predicate FILTER_ADDED_REMOVED_REMOTE_PORTS = (fd) -> { + return !isAddedOrRemovedRemotePort(fd); }; - public static boolean isAddedRemotePort(final FlowDifference fd) { - if (fd.getDifferenceType() == DifferenceType.COMPONENT_ADDED) { + public static boolean isAddedOrRemovedRemotePort(final FlowDifference fd) { + if (fd.getDifferenceType() == DifferenceType.COMPONENT_ADDED || fd.getDifferenceType() == DifferenceType.COMPONENT_REMOVED) { VersionedComponent component = fd.getComponentA(); if (component == null || fd.getComponentB() instanceof InstantiatedVersionedComponent) { component = fd.getComponentB(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java index ee658165c0..9cc8b480e2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java @@ -35,7 +35,7 @@ public class TestFlowDifferenceFilters { DifferenceType.COMPONENT_ADDED, null, remoteGroupPort, null, null, ""); // predicate should return false because we don't want to include changes for adding a remote input port - Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference)); + Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference)); } @Test @@ -47,7 +47,7 @@ public class TestFlowDifferenceFilters { DifferenceType.COMPONENT_ADDED, remoteGroupPort, null, null, null, ""); // predicate should return false because we don't want to include changes for adding a remote input port - Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference)); + Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference)); } @Test @@ -59,7 +59,7 @@ public class TestFlowDifferenceFilters { DifferenceType.COMPONENT_ADDED, null, remoteGroupPort, null, null, ""); // predicate should return false because we don't want to include changes for adding a remote input port - Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference)); + Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference)); } @Test @@ -71,6 +71,6 @@ public class TestFlowDifferenceFilters { DifferenceType.COMPONENT_ADDED, null, versionedProcessor, null, null, ""); // predicate should return true because we do want to include changes for adding a non-port - Assert.assertTrue(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference)); + Assert.assertTrue(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference)); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 165af45a47..009b096f15 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -1117,6 +1117,18 @@ public interface NiFiServiceFacade { */ RemoteProcessGroupEntity createRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO); + /** + * Determines whether or not the Remote Port with the given remotePortId is connected (i.e., there are incoming Connections if it's an Input Port + * or there are outgoing Connections if it's an Output Port). + * + * @param remoteProcessGroupId the ID of the Remote Process Group + * @param remotePortId the ID of the Port + * @return true if remote port identified is connected, false if the port is not connected + * + * @throws ResourceNotFoundException if the port cannot be found + */ + boolean isRemoteGroupPortConnected(String remoteProcessGroupId, String remotePortId); + /** * Gets a remote process group. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index bf794cf3f4..f57f628cef 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -1853,6 +1853,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status, bulletinEntities); } + @Override + public boolean isRemoteGroupPortConnected(final String remoteProcessGroupId, final String remotePortId) { + final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); + RemoteGroupPort port = rpg.getInputPort(remotePortId); + if (port != null) { + return port.hasIncomingConnection(); + } + + port = rpg.getOutputPort(remotePortId); + if (port != null) { + return !port.getConnections().isEmpty(); + } + + throw new ResourceNotFoundException("Could not find Port with ID " + remotePortId + " as a child of RemoteProcessGroup with ID " + remoteProcessGroupId); + } + @Override public void verifyCanAddTemplate(String groupId, String name) { templateDAO.verifyCanAddTemplate(name, groupId); @@ -3966,7 +3982,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Set affectedComponents = comparison.getDifferences().stream() .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow. .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED) - .filter(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS) + .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS) .map(difference -> { final VersionedComponent localComponent = difference.getComponentA(); @@ -4004,7 +4020,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } // Ignore differences for adding remote ports - if (FlowDifferenceFilters.isAddedRemotePort(difference)) { + if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) { continue; } @@ -4105,12 +4121,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private void mapToConnectableId(final Collection connectables, final Map> destination) { for (final Connectable connectable : connectables) { - final Optional versionedId = connectable.getVersionedComponentId(); - if (!versionedId.isPresent()) { - continue; + final Optional versionedIdOption = connectable.getVersionedComponentId(); + + // Determine the Versioned ID by using the ID that is assigned, if one is. Otherwise, + // we will calculate the Versioned ID. This allows us to map connectables that currently are not under + // version control. We have to do this so that if we are changing flow versions and have a component that is running and it does not exist + // in the Versioned Flow, we still need to be able to create an AffectedComponentDTO for it. + final String versionedId; + if (versionedIdOption.isPresent()) { + versionedId = versionedIdOption.get(); + } else { + versionedId = UUID.nameUUIDFromBytes(connectable.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString(); } - final List byVersionedId = destination.computeIfAbsent(versionedId.get(), key -> new ArrayList<>()); + final List byVersionedId = destination.computeIfAbsent(versionedId, key -> new ArrayList<>()); byVersionedId.add(connectable); } } @@ -4128,9 +4152,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final AffectedComponentDTO dto = new AffectedComponentDTO(); dto.setId(connectable.getIdentifier()); dto.setReferenceType(connectable.getConnectableType().name()); - dto.setProcessGroupId(connectable.getProcessGroupIdentifier()); dto.setState(connectable.getScheduledState().name()); + final String groupId = connectable instanceof RemoteGroupPort ? ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier() : connectable.getProcessGroupIdentifier(); + dto.setProcessGroupId(groupId); + entity.setComponent(dto); return entity; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index 8028bd9df8..7a96ebf50b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -17,12 +17,22 @@ package org.apache.nifi.web.api; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiParam; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import io.swagger.annotations.Authorization; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.Authorizer; @@ -75,21 +85,6 @@ import org.apache.nifi.web.util.LifecycleManagementException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedHashMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -105,6 +100,13 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; + @Path("/versions") @Api(value = "/versions", description = "Endpoint for managing version control for a flow") public class VersionsResource extends ApplicationResource { @@ -1491,6 +1493,36 @@ public class VersionsResource extends ApplicationResource { // Step 14. Restart all components final Set componentsToStart = getUpdatedEntities(runningComponents, user); + + // If there are any Remote Group Ports that are supposed to be started and have no connections, we want to remove those from our Set. + // This will happen if the Remote Group Port is transmitting when the version change happens but the new flow version does not have + // a connection to the port. In such a case, the Port still is included in the Updated Entities because we do not remove them + // when updating the flow (they are removed in the background). + final Set avoidStarting = new HashSet<>(); + for (final AffectedComponentEntity componentEntity : componentsToStart) { + final AffectedComponentDTO componentDto = componentEntity.getComponent(); + final String referenceType = componentDto.getReferenceType(); + if (!AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT.equals(referenceType) + && !AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT.equals(referenceType)) { + continue; + } + + boolean startComponent; + try { + startComponent = serviceFacade.isRemoteGroupPortConnected(componentDto.getProcessGroupId(), componentDto.getId()); + } catch (final ResourceNotFoundException rnfe) { + // Could occur if RPG is refreshed at just the right time. + startComponent = false; + } + + // We must add the components to avoid starting to a separate Set and then remove them below, + // rather than removing the component here, because doing so would result in a ConcurrentModificationException. + if (!startComponent) { + avoidStarting.add(componentEntity); + } + } + componentsToStart.removeAll(avoidStarting); + final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); asyncRequest.setCancelCallback(startComponentsPause::cancel); logger.info("Restarting {} Processors", componentsToStart.size()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 2ac31fb361..359b5241b7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1535,6 +1535,7 @@ public final class DtoFactory { final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO(); dto.setId(port.getIdentifier()); + dto.setGroupId(port.getRemoteProcessGroup().getIdentifier()); dto.setTargetId(port.getTargetIdentifier()); dto.setName(port.getName()); dto.setComments(port.getComments()); @@ -2227,7 +2228,7 @@ public final class DtoFactory { } // Ignore differences for adding remote ports - if (FlowDifferenceFilters.isAddedRemotePort(difference)) { + if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) { continue; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index e1d9e69db1..4744324d22 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -167,12 +167,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou connectable.getProcessGroup().startOutputPort((Port) connectable); break; case REMOTE_INPUT_PORT: - final RemoteGroupPort remoteInputPort = group.findRemoteGroupPort(componentId); - remoteInputPort.getRemoteProcessGroup().startTransmitting(remoteInputPort); - break; case REMOTE_OUTPUT_PORT: - final RemoteGroupPort remoteOutputPort = group.findRemoteGroupPort(componentId); - remoteOutputPort.getRemoteProcessGroup().startTransmitting(remoteOutputPort); + final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId); + remotePort.getRemoteProcessGroup().startTransmitting(remotePort); break; } } else { @@ -188,12 +185,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou connectable.getProcessGroup().stopOutputPort((Port) connectable); break; case REMOTE_INPUT_PORT: - final RemoteGroupPort remoteInputPort = group.findRemoteGroupPort(componentId); - remoteInputPort.getRemoteProcessGroup().stopTransmitting(remoteInputPort); - break; case REMOTE_OUTPUT_PORT: - final RemoteGroupPort remoteOutputPort = group.findRemoteGroupPort(componentId); - remoteOutputPort.getRemoteProcessGroup().stopTransmitting(remoteOutputPort); + final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId); + remotePort.getRemoteProcessGroup().stopTransmitting(remotePort); break; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index 8228a7781b..05a983c120 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -18,11 +18,6 @@ package org.apache.nifi.web.dao.impl; import static org.apache.nifi.util.StringUtils.isEmpty; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.regex.Matcher; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.FlowController; @@ -39,12 +34,14 @@ import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.dao.RemoteProcessGroupDAO; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; public class StandardRemoteProcessGroupDAO extends ComponentDAO implements RemoteProcessGroupDAO { - private static final Logger logger = LoggerFactory.getLogger(StandardRemoteProcessGroupDAO.class); private FlowController flowController; private RemoteProcessGroup locateRemoteProcessGroup(final String remoteProcessGroupId) { @@ -129,6 +126,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot /** * Verifies the specified remote group can be updated, if necessary. */ + @SuppressWarnings("unchecked") private void verifyUpdate(RemoteProcessGroup remoteProcessGroup, RemoteProcessGroupDTO remoteProcessGroupDto) { // see if the remote process group can start/stop transmitting if (isNotNull(remoteProcessGroupDto.isTransmitting())) {