From 34b678d30d25c930678b008a5db8a26a6a2938b3 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Tue, 6 Feb 2018 17:43:59 -0500 Subject: [PATCH] NIFI-4841 Fixing NPE when reverting local changes involving remote group ports. This closes #2454. Signed-off-by: Mark Payne --- .../nifi/groups/StandardProcessGroup.java | 25 +++++- .../nifi/util/FlowDifferenceFilters.java | 52 +++++++++++++ .../nifi/util/TestFlowDifferenceFilters.java | 76 +++++++++++++++++++ .../nifi/web/StandardNiFiServiceFacade.java | 7 ++ .../apache/nifi/web/api/VersionsResource.java | 4 +- .../apache/nifi/web/api/dto/DtoFactory.java | 6 ++ .../nifi/web/util/AffectedComponentUtils.java | 5 +- 7 files changed, 169 insertions(+), 6 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 15f2b5fa57..8b7dcd2a8c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -110,6 +110,7 @@ import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.FlowDifferenceFilters; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.util.SnippetUtils; @@ -1649,6 +1650,18 @@ public final class StandardProcessGroup implements ProcessGroup { return funnel; } + for (final RemoteProcessGroup remoteProcessGroup : group.getRemoteProcessGroups()) { + final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(identifier); + if (remoteInputPort != null) { + return remoteInputPort; + } + + final RemoteGroupPort remoteOutputPort = remoteProcessGroup.getOutputPort(identifier); + if (remoteOutputPort != null) { + return remoteOutputPort; + } + } + for (final ProcessGroup childGroup : group.getProcessGroups()) { final Connectable childGroupConnectable = findLocalConnectable(identifier, childGroup); if (childGroupConnectable != null) { @@ -3257,6 +3270,11 @@ public final class StandardProcessGroup implements ProcessGroup { continue; } + // Ignore differences for adding a remote port + if (FlowDifferenceFilters.isAddedRemotePort(diff)) { + continue; + } + // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level // and if so compare our VersionedControllerService to the existing service. if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) { @@ -3901,7 +3919,7 @@ public final class StandardProcessGroup implements ProcessGroup { final String rpgId = connectableComponent.getGroupId(); final Optional rpgOption = group.getRemoteProcessGroups().stream() .filter(component -> component.getVersionedComponentId().isPresent()) - .filter(component -> id.equals(component.getVersionedComponentId().get())) + .filter(component -> rpgId.equals(component.getVersionedComponentId().get())) .findAny(); if (!rpgOption.isPresent()) { @@ -4197,8 +4215,9 @@ public final class StandardProcessGroup implements ProcessGroup { final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorGroupServiceIds(), new EvolvingDifferenceDescriptor()); final FlowComparison comparison = flowComparator.compare(); final Set differences = comparison.getDifferences().stream() - .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED) - .collect(Collectors.toCollection(HashSet::new)); + .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED) + .filter(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS) + .collect(Collectors.toCollection(HashSet::new)); LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences); return differences; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java new file mode 100644 index 0000000000..ca48b9929e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.registry.flow.ComponentType; +import org.apache.nifi.registry.flow.VersionedComponent; +import org.apache.nifi.registry.flow.diff.DifferenceType; +import org.apache.nifi.registry.flow.diff.FlowDifference; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; + +import java.util.function.Predicate; + +public class FlowDifferenceFilters { + + /** + * Predicate that returns true if the difference is NOT a remote port being added, and false if it is. + */ + public static Predicate FILTER_ADDED_REMOTE_PORTS = (fd) -> { + return !isAddedRemotePort(fd); + }; + + public static boolean isAddedRemotePort(final FlowDifference fd) { + if (fd.getDifferenceType() == DifferenceType.COMPONENT_ADDED) { + VersionedComponent component = fd.getComponentA(); + if (component == null || fd.getComponentB() instanceof InstantiatedVersionedComponent) { + component = fd.getComponentB(); + } + + if (component.getComponentType() == ComponentType.REMOTE_INPUT_PORT + || component.getComponentType() == ComponentType.REMOTE_OUTPUT_PORT) { + return true; + } + } + + return false; + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java new file mode 100644 index 0000000000..ee658165c0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.registry.flow.ComponentType; +import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; +import org.apache.nifi.registry.flow.diff.DifferenceType; +import org.apache.nifi.registry.flow.diff.StandardFlowDifference; +import org.junit.Assert; +import org.junit.Test; + +public class TestFlowDifferenceFilters { + + @Test + public void testFilterAddedRemotePortsWithRemoteInputPortAsComponentB() { + VersionedRemoteGroupPort remoteGroupPort = new VersionedRemoteGroupPort(); + remoteGroupPort.setComponentType(ComponentType.REMOTE_INPUT_PORT); + + StandardFlowDifference flowDifference = new StandardFlowDifference( + DifferenceType.COMPONENT_ADDED, null, remoteGroupPort, null, null, ""); + + // predicate should return false because we don't want to include changes for adding a remote input port + Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference)); + } + + @Test + public void testFilterAddedRemotePortsWithRemoteInputPortAsComponentA() { + VersionedRemoteGroupPort remoteGroupPort = new VersionedRemoteGroupPort(); + remoteGroupPort.setComponentType(ComponentType.REMOTE_INPUT_PORT); + + StandardFlowDifference flowDifference = new StandardFlowDifference( + DifferenceType.COMPONENT_ADDED, remoteGroupPort, null, null, null, ""); + + // predicate should return false because we don't want to include changes for adding a remote input port + Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference)); + } + + @Test + public void testFilterAddedRemotePortsWithRemoteOutputPort() { + VersionedRemoteGroupPort remoteGroupPort = new VersionedRemoteGroupPort(); + remoteGroupPort.setComponentType(ComponentType.REMOTE_OUTPUT_PORT); + + StandardFlowDifference flowDifference = new StandardFlowDifference( + DifferenceType.COMPONENT_ADDED, null, remoteGroupPort, null, null, ""); + + // predicate should return false because we don't want to include changes for adding a remote input port + Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference)); + } + + @Test + public void testFilterAddedRemotePortsWithNonRemoteInputPort() { + VersionedProcessor versionedProcessor = new VersionedProcessor(); + versionedProcessor.setComponentType(ComponentType.PROCESSOR); + + StandardFlowDifference flowDifference = new StandardFlowDifference( + DifferenceType.COMPONENT_ADDED, null, versionedProcessor, null, null, ""); + + // predicate should return true because we do want to include changes for adding a non-port + Assert.assertTrue(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference)); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 65f6329f1f..bf794cf3f4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -122,6 +122,7 @@ import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.ComponentType; +import org.apache.nifi.util.FlowDifferenceFilters; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.AccessPolicyDTO; import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO; @@ -3965,6 +3966,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Set affectedComponents = comparison.getDifferences().stream() .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow. .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED) + .filter(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS) .map(difference -> { final VersionedComponent localComponent = difference.getComponentA(); @@ -4001,6 +4003,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { continue; } + // Ignore differences for adding remote ports + if (FlowDifferenceFilters.isAddedRemotePort(difference)) { + continue; + } + final VersionedComponent localComponent = difference.getComponentA(); if (localComponent == null) { continue; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index d8c8ddf4d4..8028bd9df8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -1539,7 +1539,9 @@ public class VersionsResource extends ApplicationResource { for (final AffectedComponentEntity original : originalEntities) { try { final AffectedComponentEntity updatedEntity = AffectedComponentUtils.updateEntity(original, serviceFacade, dtoFactory, user); - entities.add(updatedEntity); + if (updatedEntity != null) { + entities.add(updatedEntity); + } } catch (final ResourceNotFoundException rnfe) { // Component was removed. Just continue on without adding anything to the entities. // We do this because the intent is to get updated versions of the entities with current diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 36da701785..2ac31fb361 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -139,6 +139,7 @@ import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.FlowDifferenceFilters; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.FlowModification; import org.apache.nifi.web.Revision; @@ -2225,6 +2226,11 @@ public final class DtoFactory { continue; } + // Ignore differences for adding remote ports + if (FlowDifferenceFilters.isAddedRemotePort(difference)) { + continue; + } + final ComponentDifferenceDTO componentDiff = createComponentDifference(difference); final List differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java index f257bb1d21..05aa4f10f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java @@ -17,8 +17,6 @@ package org.apache.nifi.web.util; -import java.util.Optional; - import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.AffectedComponentDTO; @@ -31,6 +29,8 @@ import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; +import java.util.Optional; + public class AffectedComponentUtils { public static AffectedComponentEntity updateEntity(final AffectedComponentEntity componentEntity, final NiFiServiceFacade serviceFacade, @@ -81,4 +81,5 @@ public class AffectedComponentUtils { return null; } + }