NIFI-4863: Bug fixes to the way that we handled Remote Group Ports when changing flow version / reverting local changes

- Everywhere that we ignore adding remote ports we should ignore removing remote ports as well in flow diffs

This closes #2462.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2018-02-09 14:09:23 -05:00 committed by Bryan Bende
parent 336d3cf1f2
commit 61c6f0305b
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
10 changed files with 138 additions and 63 deletions

View File

@ -3270,11 +3270,6 @@ public final class StandardProcessGroup implements ProcessGroup {
continue;
}
// Ignore differences for adding a remote port
if (FlowDifferenceFilters.isAddedRemotePort(diff)) {
continue;
}
// If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
// and if so compare our VersionedControllerService to the existing service.
if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
@ -3909,9 +3904,17 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final RemoteProcessGroup rpg = rpgOption.get();
return rpg.getInputPorts().stream()
final Optional<RemoteGroupPort> portByIdOption = rpg.getInputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny();
if (portByIdOption.isPresent()) {
return portByIdOption.get();
}
return rpg.getInputPorts().stream()
.filter(component -> connectableComponent.getName().equals(component.getName()))
.findAny()
.orElse(null);
}
@ -3928,9 +3931,17 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final RemoteProcessGroup rpg = rpgOption.get();
return rpg.getOutputPorts().stream()
final Optional<RemoteGroupPort> portByIdOption = rpg.getOutputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny();
if (portByIdOption.isPresent()) {
return portByIdOption.get();
}
return rpg.getOutputPorts().stream()
.filter(component -> connectableComponent.getName().equals(component.getName()))
.findAny()
.orElse(null);
}
@ -4216,7 +4227,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final FlowComparison comparison = flowComparator.compare();
final Set<FlowDifference> differences = comparison.getDifferences().stream()
.filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS)
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
.collect(Collectors.toCollection(HashSet::new));
LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);

View File

@ -672,6 +672,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
if (!StringUtils.isBlank(descriptor.getBatchDuration())) {
port.setBatchDuration(descriptor.getBatchDuration());
}
port.setVersionedComponentId(descriptor.getVersionedComponentId());
return port;
} finally {

View File

@ -29,12 +29,12 @@ public class FlowDifferenceFilters {
/**
* Predicate that returns true if the difference is NOT a remote port being added, and false if it is.
*/
public static Predicate<FlowDifference> FILTER_ADDED_REMOTE_PORTS = (fd) -> {
return !isAddedRemotePort(fd);
public static Predicate<FlowDifference> FILTER_ADDED_REMOVED_REMOTE_PORTS = (fd) -> {
return !isAddedOrRemovedRemotePort(fd);
};
public static boolean isAddedRemotePort(final FlowDifference fd) {
if (fd.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
public static boolean isAddedOrRemovedRemotePort(final FlowDifference fd) {
if (fd.getDifferenceType() == DifferenceType.COMPONENT_ADDED || fd.getDifferenceType() == DifferenceType.COMPONENT_REMOVED) {
VersionedComponent component = fd.getComponentA();
if (component == null || fd.getComponentB() instanceof InstantiatedVersionedComponent) {
component = fd.getComponentB();

View File

@ -35,7 +35,7 @@ public class TestFlowDifferenceFilters {
DifferenceType.COMPONENT_ADDED, null, remoteGroupPort, null, null, "");
// predicate should return false because we don't want to include changes for adding a remote input port
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference));
}
@Test
@ -47,7 +47,7 @@ public class TestFlowDifferenceFilters {
DifferenceType.COMPONENT_ADDED, remoteGroupPort, null, null, null, "");
// predicate should return false because we don't want to include changes for adding a remote input port
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference));
}
@Test
@ -59,7 +59,7 @@ public class TestFlowDifferenceFilters {
DifferenceType.COMPONENT_ADDED, null, remoteGroupPort, null, null, "");
// predicate should return false because we don't want to include changes for adding a remote input port
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference));
}
@Test
@ -71,6 +71,6 @@ public class TestFlowDifferenceFilters {
DifferenceType.COMPONENT_ADDED, null, versionedProcessor, null, null, "");
// predicate should return true because we do want to include changes for adding a non-port
Assert.assertTrue(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
Assert.assertTrue(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference));
}
}

View File

@ -1117,6 +1117,18 @@ public interface NiFiServiceFacade {
*/
RemoteProcessGroupEntity createRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO);
/**
* Determines whether or not the Remote Port with the given remotePortId is connected (i.e., there are incoming Connections if it's an Input Port
* or there are outgoing Connections if it's an Output Port).
*
* @param remoteProcessGroupId the ID of the Remote Process Group
* @param remotePortId the ID of the Port
* @return <code>true</code> if remote port identified is connected, <code>false</code> if the port is not connected
*
* @throws ResourceNotFoundException if the port cannot be found
*/
boolean isRemoteGroupPortConnected(String remoteProcessGroupId, String remotePortId);
/**
* Gets a remote process group.
*

View File

@ -1853,6 +1853,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status, bulletinEntities);
}
@Override
public boolean isRemoteGroupPortConnected(final String remoteProcessGroupId, final String remotePortId) {
final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
RemoteGroupPort port = rpg.getInputPort(remotePortId);
if (port != null) {
return port.hasIncomingConnection();
}
port = rpg.getOutputPort(remotePortId);
if (port != null) {
return !port.getConnections().isEmpty();
}
throw new ResourceNotFoundException("Could not find Port with ID " + remotePortId + " as a child of RemoteProcessGroup with ID " + remoteProcessGroupId);
}
@Override
public void verifyCanAddTemplate(String groupId, String name) {
templateDAO.verifyCanAddTemplate(name, groupId);
@ -3966,7 +3982,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
.filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
.filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS)
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
.map(difference -> {
final VersionedComponent localComponent = difference.getComponentA();
@ -4004,7 +4020,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
// Ignore differences for adding remote ports
if (FlowDifferenceFilters.isAddedRemotePort(difference)) {
if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) {
continue;
}
@ -4105,12 +4121,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private void mapToConnectableId(final Collection<? extends Connectable> connectables, final Map<String, List<Connectable>> destination) {
for (final Connectable connectable : connectables) {
final Optional<String> versionedId = connectable.getVersionedComponentId();
if (!versionedId.isPresent()) {
continue;
final Optional<String> versionedIdOption = connectable.getVersionedComponentId();
// Determine the Versioned ID by using the ID that is assigned, if one is. Otherwise,
// we will calculate the Versioned ID. This allows us to map connectables that currently are not under
// version control. We have to do this so that if we are changing flow versions and have a component that is running and it does not exist
// in the Versioned Flow, we still need to be able to create an AffectedComponentDTO for it.
final String versionedId;
if (versionedIdOption.isPresent()) {
versionedId = versionedIdOption.get();
} else {
versionedId = UUID.nameUUIDFromBytes(connectable.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
}
final List<Connectable> byVersionedId = destination.computeIfAbsent(versionedId.get(), key -> new ArrayList<>());
final List<Connectable> byVersionedId = destination.computeIfAbsent(versionedId, key -> new ArrayList<>());
byVersionedId.add(connectable);
}
}
@ -4128,9 +4152,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final AffectedComponentDTO dto = new AffectedComponentDTO();
dto.setId(connectable.getIdentifier());
dto.setReferenceType(connectable.getConnectableType().name());
dto.setProcessGroupId(connectable.getProcessGroupIdentifier());
dto.setState(connectable.getScheduledState().name());
final String groupId = connectable instanceof RemoteGroupPort ? ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier() : connectable.getProcessGroupIdentifier();
dto.setProcessGroupId(groupId);
entity.setComponent(dto);
return entity;
}

View File

@ -17,12 +17,22 @@
package org.apache.nifi.web.api;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.Authorizer;
@ -75,21 +85,6 @@ import org.apache.nifi.web.util.LifecycleManagementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@ -105,6 +100,13 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
@Path("/versions")
@Api(value = "/versions", description = "Endpoint for managing version control for a flow")
public class VersionsResource extends ApplicationResource {
@ -1491,6 +1493,36 @@ public class VersionsResource extends ApplicationResource {
// Step 14. Restart all components
final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents, user);
// If there are any Remote Group Ports that are supposed to be started and have no connections, we want to remove those from our Set.
// This will happen if the Remote Group Port is transmitting when the version change happens but the new flow version does not have
// a connection to the port. In such a case, the Port still is included in the Updated Entities because we do not remove them
// when updating the flow (they are removed in the background).
final Set<AffectedComponentEntity> avoidStarting = new HashSet<>();
for (final AffectedComponentEntity componentEntity : componentsToStart) {
final AffectedComponentDTO componentDto = componentEntity.getComponent();
final String referenceType = componentDto.getReferenceType();
if (!AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT.equals(referenceType)
&& !AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT.equals(referenceType)) {
continue;
}
boolean startComponent;
try {
startComponent = serviceFacade.isRemoteGroupPortConnected(componentDto.getProcessGroupId(), componentDto.getId());
} catch (final ResourceNotFoundException rnfe) {
// Could occur if RPG is refreshed at just the right time.
startComponent = false;
}
// We must add the components to avoid starting to a separate Set and then remove them below,
// rather than removing the component here, because doing so would result in a ConcurrentModificationException.
if (!startComponent) {
avoidStarting.add(componentEntity);
}
}
componentsToStart.removeAll(avoidStarting);
final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(startComponentsPause::cancel);
logger.info("Restarting {} Processors", componentsToStart.size());

View File

@ -1535,6 +1535,7 @@ public final class DtoFactory {
final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO();
dto.setId(port.getIdentifier());
dto.setGroupId(port.getRemoteProcessGroup().getIdentifier());
dto.setTargetId(port.getTargetIdentifier());
dto.setName(port.getName());
dto.setComments(port.getComments());
@ -2227,7 +2228,7 @@ public final class DtoFactory {
}
// Ignore differences for adding remote ports
if (FlowDifferenceFilters.isAddedRemotePort(difference)) {
if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) {
continue;
}

View File

@ -167,12 +167,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
connectable.getProcessGroup().startOutputPort((Port) connectable);
break;
case REMOTE_INPUT_PORT:
final RemoteGroupPort remoteInputPort = group.findRemoteGroupPort(componentId);
remoteInputPort.getRemoteProcessGroup().startTransmitting(remoteInputPort);
break;
case REMOTE_OUTPUT_PORT:
final RemoteGroupPort remoteOutputPort = group.findRemoteGroupPort(componentId);
remoteOutputPort.getRemoteProcessGroup().startTransmitting(remoteOutputPort);
final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId);
remotePort.getRemoteProcessGroup().startTransmitting(remotePort);
break;
}
} else {
@ -188,12 +185,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
connectable.getProcessGroup().stopOutputPort((Port) connectable);
break;
case REMOTE_INPUT_PORT:
final RemoteGroupPort remoteInputPort = group.findRemoteGroupPort(componentId);
remoteInputPort.getRemoteProcessGroup().stopTransmitting(remoteInputPort);
break;
case REMOTE_OUTPUT_PORT:
final RemoteGroupPort remoteOutputPort = group.findRemoteGroupPort(componentId);
remoteOutputPort.getRemoteProcessGroup().stopTransmitting(remoteOutputPort);
final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId);
remotePort.getRemoteProcessGroup().stopTransmitting(remotePort);
break;
}
}

View File

@ -18,11 +18,6 @@ package org.apache.nifi.web.dao.impl;
import static org.apache.nifi.util.StringUtils.isEmpty;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
@ -39,12 +34,14 @@ import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
public class StandardRemoteProcessGroupDAO extends ComponentDAO implements RemoteProcessGroupDAO {
private static final Logger logger = LoggerFactory.getLogger(StandardRemoteProcessGroupDAO.class);
private FlowController flowController;
private RemoteProcessGroup locateRemoteProcessGroup(final String remoteProcessGroupId) {
@ -129,6 +126,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
/**
* Verifies the specified remote group can be updated, if necessary.
*/
@SuppressWarnings("unchecked")
private void verifyUpdate(RemoteProcessGroup remoteProcessGroup, RemoteProcessGroupDTO remoteProcessGroupDto) {
// see if the remote process group can start/stop transmitting
if (isNotNull(remoteProcessGroupDto.isTransmitting())) {