NIFI-11251: Fixed case in which changing Flow Version was improperly attempting to use a Temporary Funnel destination for a connection due to differences in Group ID by instead checking if the destination is reachable; fixed issue in which processors in an inner versioned group were not stopped even though they changed, when a higher-level parent (that was itself under version control) had its version changed. Wrote system tests to verify

- Fixed system tests so that they work properly in Clustered version of RegistryClientIT
- Fixed system test - ensure that we wait for processors to become valid before attempting to start them; also added an additional system test around Controller Services in versioned flows

This closes #7095

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-03-28 15:23:26 -04:00 committed by exceptionfactory
parent dde89c0b15
commit 23d6d6ede4
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 212 additions and 21 deletions

View File

@ -680,18 +680,18 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
continue;
}
// If the Connection's destination didn't change, nothing to do
// If the Connection's destination didn't change, and the new is still reachable, nothing to do
final String destinationVersionId = connection.getDestination().getVersionedComponentId().orElse(null);
final String proposedDestinationId = proposedConnection.getDestination().getId();
final String destinationGroupVersionId = connection.getDestination().getProcessGroup().getVersionedComponentId().orElse(null);
final String proposedDestinationGroupId = proposedConnection.getDestination().getGroupId();
if (Objects.equals(destinationVersionId, proposedDestinationId) && Objects.equals(destinationGroupVersionId, proposedDestinationGroupId)) {
Connectable newDestination = getConnectable(group, proposedConnection.getDestination());
final boolean newDestinationReachableFromSource = isConnectionDestinationReachable(connection.getSource(), newDestination);
if (Objects.equals(destinationVersionId, proposedDestinationId) && newDestinationReachableFromSource) {
continue;
}
// Find the destination of the connection. If the destination doesn't yet exist (because it's part of the proposed Process Group but not yet added),
// we will set the destination to a temporary destination. Then, after adding components, we will update the destinations again.
Connectable newDestination = getConnectable(group, proposedConnection.getDestination());
final boolean useTempDestination = isTempDestinationNecessary(connection, proposedConnection, newDestination);
if (useTempDestination) {
final Funnel temporaryDestination = getTemporaryFunnel(connection.getProcessGroup());
@ -706,6 +706,29 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
return connectionsWithTempDestination;
}
/**
* Checks if a Connection can be made from the given source component to the given destination component
* @param source the source component
* @param destination the destination component
* @return true if the connection is allowable, <code>false</code> if the connection cannot be made due to the Process Group hierarchies
*/
private boolean isConnectionDestinationReachable(final Connectable source, final Connectable destination) {
if (source == null || destination == null) {
return false;
}
// If the source is an Output Port, the destination must be in the parent group, unless the destination is the Input Port of another group
if (source.getConnectableType() == ConnectableType.OUTPUT_PORT) {
if (destination.getConnectableType() == ConnectableType.INPUT_PORT) {
return Objects.equals(source.getProcessGroup().getParent(), destination.getProcessGroup().getParent());
}
return Objects.equals(source.getProcessGroup().getParent(), destination.getProcessGroup());
}
return Objects.equals(source.getProcessGroup(), destination.getProcessGroup());
}
private boolean isTempDestinationNecessary(final Connection existingConnection, final VersionedConnection proposedConnection, final Connectable newDestination) {
if (newDestination == null) {
return true;
@ -996,18 +1019,6 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
}
private Set<Relationship> getAutoTerminatedRelationships(final ProcessorNode processor, final VersionedProcessor proposedProcessor) {
final Set<String> relationshipNames = proposedProcessor.getAutoTerminatedRelationships();
if (relationshipNames == null) {
return Collections.emptySet();
}
return relationshipNames.stream()
.map(processor::getRelationship)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
}
private void synchronizeRemoteGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, RemoteProcessGroup> rpgsByVersionedId) {
for (final VersionedRemoteProcessGroup proposedRpg : proposed.getRemoteProcessGroups()) {
final RemoteProcessGroup rpg = rpgsByVersionedId.get(proposedRpg.getIdentifier());

View File

@ -5447,7 +5447,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final Set<String> ancestorServiceIds = group.getAncestorServiceIds();
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorServiceIds, new StaticDifferenceDescriptor(),
Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW);
Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.DEEP);
final FlowComparison comparison = flowComparator.compare();
final FlowManager flowManager = controllerFacade.getFlowManager();

View File

@ -54,6 +54,7 @@ public class StandardCountService extends AbstractControllerService implements C
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final long startValue = Long.parseLong(context.getProperty(START_VALUE).getValue());
getLogger().info("Setting counter to {}", startValue);
counter.set(startValue);
}

View File

@ -1597,9 +1597,16 @@ public class NiFiClientUtil {
public VersionControlInformationEntity startVersionControl(final ProcessGroupEntity group, final FlowRegistryClientEntity registryClient, final String bucketId, final String flowName)
throws NiFiClientException, IOException{
return publishFlowVersion(group, registryClient, bucketId, flowName, null);
}
private VersionControlInformationEntity publishFlowVersion(final ProcessGroupEntity group, final FlowRegistryClientEntity registryClient, final String bucketId, final String flowName,
final String flowId) throws NiFiClientException, IOException{
final VersionedFlowDTO versionedFlowDto = new VersionedFlowDTO();
versionedFlowDto.setBucketId(bucketId);
versionedFlowDto.setFlowName(flowName);
versionedFlowDto.setFlowId(flowId);
versionedFlowDto.setRegistryId(registryClient.getId());
versionedFlowDto.setAction(VersionedFlowDTO.COMMIT_ACTION);
@ -1610,6 +1617,15 @@ public class NiFiClientUtil {
return nifiClient.getVersionsClient().startVersionControl(group.getId(), requestEntity);
}
public VersionControlInformationEntity saveFlowVersion(final ProcessGroupEntity group, final FlowRegistryClientEntity registryClient, final VersionControlInformationEntity currentVci)
throws NiFiClientException, IOException {
final VersionControlInformationDTO currentDto = currentVci.getVersionControlInformation();
return publishFlowVersion(group, registryClient, currentDto.getBucketId(), currentDto.getFlowName(), currentDto.getFlowId());
}
public VersionedFlowUpdateRequestEntity revertChanges(final ProcessGroupEntity group) throws NiFiClientException, IOException, InterruptedException {
final VersionControlInformationEntity vciEntity = nifiClient.getVersionsClient().getVersionControlInfo(group.getId());
final VersionedFlowUpdateRequestEntity revertRequest = nifiClient.getVersionsClient().initiateRevertFlowVersion(group.getId(), vciEntity);
@ -1655,6 +1671,12 @@ public class NiFiClientUtil {
}
public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final int version) throws NiFiClientException, IOException, InterruptedException {
return changeFlowVersion(processGroupId, version, true);
}
public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final int version, final boolean throwOnFailure)
throws NiFiClientException, IOException, InterruptedException {
final ProcessGroupEntity groupEntity = nifiClient.getProcessGroupClient().getProcessGroup(processGroupId);
final ProcessGroupDTO groupDto = groupEntity.getComponent();
final VersionControlInformationDTO vciDto = groupDto.getVersionControlInformation();
@ -1669,14 +1691,18 @@ public class NiFiClientUtil {
requestEntity.setVersionControlInformation(vciDto);
final VersionedFlowUpdateRequestEntity result = nifiClient.getVersionsClient().updateVersionControlInfo(processGroupId, requestEntity);
return waitForVersionFlowUpdateComplete(result.getRequest().getRequestId());
return waitForVersionFlowUpdateComplete(result.getRequest().getRequestId(), throwOnFailure);
}
public VersionedFlowUpdateRequestEntity waitForVersionFlowUpdateComplete(final String updateRequestId) throws NiFiClientException, IOException, InterruptedException {
public VersionedFlowUpdateRequestEntity waitForVersionFlowUpdateComplete(final String updateRequestId, final boolean throwOnFailure) throws NiFiClientException, IOException, InterruptedException {
while (true) {
final VersionedFlowUpdateRequestEntity result = nifiClient.getVersionsClient().getUpdateRequest(updateRequestId);
final boolean complete = result.getRequest().isComplete();
if (complete) {
if (throwOnFailure && result.getRequest().getFailureReason() != null) {
throw new RuntimeException("Version Flow Update request failed due to: " + result.getRequest().getFailureReason());
}
return nifiClient.getVersionsClient().deleteUpdateRequest(updateRequestId);
}
@ -1739,4 +1765,10 @@ public class NiFiClientUtil {
return nifiClient.getProcessGroupClient().copySnippet(destinationGroupId, requestEntity);
}
public ConnectionEntity setFifoPrioritizer(final ConnectionEntity connectionEntity) throws NiFiClientException, IOException {
final ConnectionDTO connectionDto = connectionEntity.getComponent();
connectionDto.setPrioritizers(Collections.singletonList("org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer"));
return nifiClient.getConnectionClient().updateConnection(connectionEntity);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.tests.system.registry;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.tests.system.NiFiClientUtil;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
@ -25,8 +26,11 @@ import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
@ -40,6 +44,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@ -52,6 +57,148 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class RegistryClientIT extends NiFiSystemIT {
/**
* Test a scenario where we have Parent Process Group with a child process group. The child group is under Version Control.
* Then the parent is placed under Version Control. Then modify a Processor in child. Register snapshot for child, then for parent.
* Then start Flow.
* Then change between versions at the Parent level while the flow is stopped and while it's running.
*/
@Test
public void testChangeVersionOnParentThatCascadesToChild() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient();
final NiFiClientUtil util = getClientUtil();
final ProcessGroupEntity parent = util.createProcessGroup("Parent", "root");
final ProcessGroupEntity child = util.createProcessGroup("Child", parent.getId());
final PortEntity inputPort = util.createInputPort("Input Port", child.getId());
final PortEntity outputPort = util.createOutputPort("Output Port", child.getId());
final ProcessorEntity updateContents = util.createProcessor("UpdateContent", child.getId());
util.updateProcessorProperties(updateContents, Collections.singletonMap("Content", "Updated"));
util.createConnection(inputPort, updateContents);
util.createConnection(updateContents, outputPort, "success");
final ProcessorEntity generate = util.createProcessor("GenerateFlowFile", parent.getId());
util.updateProcessorProperties(generate, Collections.singletonMap("Text", "Hello World"));
util.createConnection(generate, inputPort, "success");
final ProcessorEntity terminate = util.createProcessor("TerminateFlowFile", parent.getId());
final ConnectionEntity connectionToTerminate = util.createConnection(outputPort, terminate);
final VersionControlInformationEntity childVci = util.startVersionControl(child, clientEntity, "testChangeVersionOnParentThatCascadesToChild", "Child");
final VersionControlInformationEntity parentVci = util.startVersionControl(parent, clientEntity, "testChangeVersionOnParentThatCascadesToChild", "Parent");
// Change the properties of the UpdateContent processor and commit as v2
util.updateProcessorProperties(updateContents, Collections.singletonMap("Content", "Updated v2"));
util.saveFlowVersion(child, clientEntity, childVci);
util.saveFlowVersion(parent, clientEntity, parentVci);
// Ensure that we have the correct state
util.assertFlowUpToDate(parent.getId());
util.assertFlowUpToDate(child.getId());
// Verify that we are able to switch back to v1 while everything is stopped
util.changeFlowVersion(parent.getId(), 1);
util.assertFlowStaleAndUnmodified(parent.getId());
util.assertFlowStaleAndUnmodified(child.getId());
// Start the flow and verify the contents of the flow file
util.waitForValidProcessor(updateContents.getId());
util.waitForValidProcessor(generate.getId());
util.startProcessGroupComponents(child.getId());
util.startProcessor(generate);
waitForQueueCount(connectionToTerminate.getId(), getNumberOfNodes());
final String contents = util.getFlowFileContentAsUtf8(connectionToTerminate.getId(), 0);
assertEquals("Updated", contents);
// Switch Version back to v2 while it's running
util.changeFlowVersion(parent.getId(), 2);
util.assertFlowUpToDate(parent.getId());
util.assertFlowUpToDate(child.getId());
// With flow running, change version to v1. Restart GenerateFlowFile to trigger another FlowFile to be generated
util.stopProcessor(generate);
util.startProcessor(generate);
waitForQueueCount(connectionToTerminate.getId(), 2 * getNumberOfNodes());
// Ensure that the contents are correct
final String secondFlowFileContents = util.getFlowFileContentAsUtf8(connectionToTerminate.getId(), getNumberOfNodes());
assertEquals("Updated v2", secondFlowFileContents);
// Switch back to v1 while flow is running to verify that the version can change back to a lower version as well
util.changeFlowVersion(parent.getId(), 1);
util.assertFlowStaleAndUnmodified(parent.getId());
util.assertFlowStaleAndUnmodified(child.getId());
util.stopProcessor(generate);
util.startProcessor(generate);
waitForQueueCount(connectionToTerminate.getId(), 3 * getNumberOfNodes());
final String thirdFlowFileContents = util.getFlowFileContentAsUtf8(connectionToTerminate.getId(), getNumberOfNodes() * 2);
assertEquals("Updated", thirdFlowFileContents);
}
@Test
public void testControllerServiceUpdateWhileRunning() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient();
final NiFiClientUtil util = getClientUtil();
final ProcessGroupEntity group = util.createProcessGroup("Parent", "root");
final ControllerServiceEntity service = util.createControllerService("StandardCountService", group.getId());
final ProcessorEntity generate = util.createProcessor("GenerateFlowFile", group.getId());
final ProcessorEntity countProcessor = util.createProcessor("CountFlowFiles", group.getId());
util.updateProcessorProperties(countProcessor, Collections.singletonMap("Count Service", service.getComponent().getId()));
final ProcessorEntity terminate = util.createProcessor("TerminateFlowFile", group.getId());
final ConnectionEntity connectionToTerminate = util.createConnection(countProcessor, terminate, "success");
util.setFifoPrioritizer(connectionToTerminate);
util.createConnection(generate, countProcessor, "success");
// Save the flow as v1
final VersionControlInformationEntity vci = util.startVersionControl(group, clientEntity, "testControllerServiceUpdateWhileRunning", "Parent");
// Change the value of of the Controller Service's start value to 2000, and change the text of the GenerateFlowFile just to make it run each time the version is changed
util.updateControllerServiceProperties(service, Collections.singletonMap("Start Value", "2000"));
util.updateProcessorProperties(generate, Collections.singletonMap("Text", "Hello World"));
// Save the flow as v2
util.saveFlowVersion(group, clientEntity, vci);
// Change back to v1 and start the flow
util.changeFlowVersion(group.getId(), 1);
util.assertFlowStaleAndUnmodified(group.getId());
util.enableControllerService(service);
util.waitForValidProcessor(generate.getId());
util.startProcessor(generate);
util.waitForValidProcessor(countProcessor.getId());
util.startProcessor(countProcessor);
// Ensure that we get the expected result
waitForQueueCount(connectionToTerminate.getId(), getNumberOfNodes());
final Map<String, String> firstFlowFileAttributes = util.getQueueFlowFile(connectionToTerminate.getId(), 0).getFlowFile().getAttributes();
assertEquals("1", firstFlowFileAttributes.get("count"));
// Change to v2 and ensure that the output is correct
util.changeFlowVersion(group.getId(), 2);
util.assertFlowUpToDate(group.getId());
waitForQueueCount(connectionToTerminate.getId(), 2 * getNumberOfNodes());
final Map<String, String> secondFlowFileAttributes = util.getQueueFlowFile(connectionToTerminate.getId(), getNumberOfNodes()).getFlowFile().getAttributes();
assertEquals("2001", secondFlowFileAttributes.get("count"));
// Change back to v1 and ensure that the output is correct. It should reset count back to 0.
util.changeFlowVersion(group.getId(), 1);
util.assertFlowStaleAndUnmodified(group.getId());
waitForQueueCount(connectionToTerminate.getId(), 3 * getNumberOfNodes());
final Map<String, String> thirdFlowFileAttributes = util.getQueueFlowFile(connectionToTerminate.getId(), getNumberOfNodes() * 2).getFlowFile().getAttributes();
assertEquals("1", thirdFlowFileAttributes.get("count"));
}
@Test
public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
@ -107,7 +254,7 @@ public class RegistryClientIT extends NiFiSystemIT {
assertNotNull(imported);
getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2);
final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2, false);
final String failureReason = version2Result.getRequest().getFailureReason();
assertNotNull(failureReason);