NIFI-4841 Fixing NPE when reverting local changes involving remote group ports. This closes #2454.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Bryan Bende 2018-02-06 17:43:59 -05:00 committed by Mark Payne
parent 25e0bbb68d
commit 34b678d30d
7 changed files with 169 additions and 6 deletions

View File

@ -110,6 +110,7 @@ import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.SnippetUtils;
@ -1649,6 +1650,18 @@ public final class StandardProcessGroup implements ProcessGroup {
return funnel;
}
for (final RemoteProcessGroup remoteProcessGroup : group.getRemoteProcessGroups()) {
final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(identifier);
if (remoteInputPort != null) {
return remoteInputPort;
}
final RemoteGroupPort remoteOutputPort = remoteProcessGroup.getOutputPort(identifier);
if (remoteOutputPort != null) {
return remoteOutputPort;
}
}
for (final ProcessGroup childGroup : group.getProcessGroups()) {
final Connectable childGroupConnectable = findLocalConnectable(identifier, childGroup);
if (childGroupConnectable != null) {
@ -3257,6 +3270,11 @@ public final class StandardProcessGroup implements ProcessGroup {
continue;
}
// Ignore differences for adding a remote port
if (FlowDifferenceFilters.isAddedRemotePort(diff)) {
continue;
}
// If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
// and if so compare our VersionedControllerService to the existing service.
if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
@ -3901,7 +3919,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final String rpgId = connectableComponent.getGroupId();
final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.filter(component -> rpgId.equals(component.getVersionedComponentId().get()))
.findAny();
if (!rpgOption.isPresent()) {
@ -4198,6 +4216,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final FlowComparison comparison = flowComparator.compare();
final Set<FlowDifference> differences = comparison.getDifferences().stream()
.filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS)
.collect(Collectors.toCollection(HashSet::new));
LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
import java.util.function.Predicate;
public class FlowDifferenceFilters {
/**
* Predicate that returns true if the difference is NOT a remote port being added, and false if it is.
*/
public static Predicate<FlowDifference> FILTER_ADDED_REMOTE_PORTS = (fd) -> {
return !isAddedRemotePort(fd);
};
public static boolean isAddedRemotePort(final FlowDifference fd) {
if (fd.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
VersionedComponent component = fd.getComponentA();
if (component == null || fd.getComponentB() instanceof InstantiatedVersionedComponent) {
component = fd.getComponentB();
}
if (component.getComponentType() == ComponentType.REMOTE_INPUT_PORT
|| component.getComponentType() == ComponentType.REMOTE_OUTPUT_PORT) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.StandardFlowDifference;
import org.junit.Assert;
import org.junit.Test;
public class TestFlowDifferenceFilters {
@Test
public void testFilterAddedRemotePortsWithRemoteInputPortAsComponentB() {
VersionedRemoteGroupPort remoteGroupPort = new VersionedRemoteGroupPort();
remoteGroupPort.setComponentType(ComponentType.REMOTE_INPUT_PORT);
StandardFlowDifference flowDifference = new StandardFlowDifference(
DifferenceType.COMPONENT_ADDED, null, remoteGroupPort, null, null, "");
// predicate should return false because we don't want to include changes for adding a remote input port
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
}
@Test
public void testFilterAddedRemotePortsWithRemoteInputPortAsComponentA() {
VersionedRemoteGroupPort remoteGroupPort = new VersionedRemoteGroupPort();
remoteGroupPort.setComponentType(ComponentType.REMOTE_INPUT_PORT);
StandardFlowDifference flowDifference = new StandardFlowDifference(
DifferenceType.COMPONENT_ADDED, remoteGroupPort, null, null, null, "");
// predicate should return false because we don't want to include changes for adding a remote input port
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
}
@Test
public void testFilterAddedRemotePortsWithRemoteOutputPort() {
VersionedRemoteGroupPort remoteGroupPort = new VersionedRemoteGroupPort();
remoteGroupPort.setComponentType(ComponentType.REMOTE_OUTPUT_PORT);
StandardFlowDifference flowDifference = new StandardFlowDifference(
DifferenceType.COMPONENT_ADDED, null, remoteGroupPort, null, null, "");
// predicate should return false because we don't want to include changes for adding a remote input port
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
}
@Test
public void testFilterAddedRemotePortsWithNonRemoteInputPort() {
VersionedProcessor versionedProcessor = new VersionedProcessor();
versionedProcessor.setComponentType(ComponentType.PROCESSOR);
StandardFlowDifference flowDifference = new StandardFlowDifference(
DifferenceType.COMPONENT_ADDED, null, versionedProcessor, null, null, "");
// predicate should return true because we do want to include changes for adding a non-port
Assert.assertTrue(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
}
}

View File

@ -122,6 +122,7 @@ import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.AccessPolicyDTO;
import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
@ -3965,6 +3966,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
.filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
.filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS)
.map(difference -> {
final VersionedComponent localComponent = difference.getComponentA();
@ -4001,6 +4003,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
continue;
}
// Ignore differences for adding remote ports
if (FlowDifferenceFilters.isAddedRemotePort(difference)) {
continue;
}
final VersionedComponent localComponent = difference.getComponentA();
if (localComponent == null) {
continue;

View File

@ -1539,7 +1539,9 @@ public class VersionsResource extends ApplicationResource {
for (final AffectedComponentEntity original : originalEntities) {
try {
final AffectedComponentEntity updatedEntity = AffectedComponentUtils.updateEntity(original, serviceFacade, dtoFactory, user);
if (updatedEntity != null) {
entities.add(updatedEntity);
}
} catch (final ResourceNotFoundException rnfe) {
// Component was removed. Just continue on without adding anything to the entities.
// We do this because the intent is to get updated versions of the entities with current

View File

@ -139,6 +139,7 @@ import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.FlowModification;
import org.apache.nifi.web.Revision;
@ -2225,6 +2226,11 @@ public final class DtoFactory {
continue;
}
// Ignore differences for adding remote ports
if (FlowDifferenceFilters.isAddedRemotePort(difference)) {
continue;
}
final ComponentDifferenceDTO componentDiff = createComponentDifference(difference);
final List<DifferenceDTO> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>());

View File

@ -17,8 +17,6 @@
package org.apache.nifi.web.util;
import java.util.Optional;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
@ -31,6 +29,8 @@ import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import java.util.Optional;
public class AffectedComponentUtils {
public static AffectedComponentEntity updateEntity(final AffectedComponentEntity componentEntity, final NiFiServiceFacade serviceFacade,
@ -81,4 +81,5 @@ public class AffectedComponentUtils {
return null;
}
}