NIFI-11192: Ensure that if ports moved between parent/child group in between flow versions that we can properly handle that. Added system tests to verify.

NIFI-11192: If a failure is encountered when changing the version of a flow from 1 version to another, attempt to rollback the changes instead of just failing with the flow in a bad state
Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6981
This commit is contained in:
Mark Payne 2023-02-21 11:28:54 -05:00 committed by Matthew Burgess
parent 7b12545e5a
commit 87e61c50ee
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
17 changed files with 1178 additions and 45 deletions

View File

@ -683,18 +683,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
// If the Connection's destination didn't change, nothing to do
final String destinationVersionId = connection.getDestination().getVersionedComponentId().orElse(null);
final String proposedDestinationId = proposedConnection.getDestination().getId();
if (Objects.equals(destinationVersionId, proposedDestinationId)) {
final String destinationGroupVersionId = connection.getDestination().getProcessGroup().getVersionedComponentId().orElse(null);
final String proposedDestinationGroupId = proposedConnection.getDestination().getGroupId();
if (Objects.equals(destinationVersionId, proposedDestinationId) && Objects.equals(destinationGroupVersionId, proposedDestinationGroupId)) {
continue;
}
// Find the destination of the connection. If the destination doesn't yet exist (because it's part of the proposed Process Group but not yet added),
// we will set the destination to a temporary destination. Then, after adding components, we will update the destinations again.
Connectable newDestination = getConnectable(group, proposedConnection.getDestination());
if (
newDestination == null
||
(newDestination.getConnectableType() == ConnectableType.OUTPUT_PORT && !newDestination.getProcessGroup().equals(connection.getProcessGroup()))
) {
final boolean useTempDestination = isTempDestinationNecessary(connection, proposedConnection, newDestination);
if (useTempDestination) {
final Funnel temporaryDestination = getTemporaryFunnel(connection.getProcessGroup());
LOG.debug("Updated Connection {} to have a temporary destination of {}", connection, temporaryDestination);
newDestination = temporaryDestination;
@ -707,6 +706,36 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
return connectionsWithTempDestination;
}
private boolean isTempDestinationNecessary(final Connection existingConnection, final VersionedConnection proposedConnection, final Connectable newDestination) {
if (newDestination == null) {
return true;
}
// If the destination is an Input Port or an Output Port and the group changed, use a temp destination
final ConnectableType connectableType = newDestination.getConnectableType();
final boolean port = connectableType == ConnectableType.OUTPUT_PORT || connectableType == ConnectableType.INPUT_PORT;
final boolean groupChanged = !newDestination.getProcessGroup().equals(existingConnection.getProcessGroup());
if (port && groupChanged) {
return true;
}
// If the proposed destination has a different group than the existing group, use a temp destination.
final String proposedDestinationGroupId = proposedConnection.getDestination().getGroupId();
final String destinationGroupVersionedComponentId = existingConnection.getDestination().getProcessGroup().getVersionedComponentId().orElse(null);
if (!Objects.equals(proposedDestinationGroupId, destinationGroupVersionedComponentId)) {
return true;
}
// If the proposed connection exists in a different group than the existing group, use a temp destination.
final String connectionGroupVersionedComponentId = existingConnection.getProcessGroup().getVersionedComponentId().orElse(null);
final String proposedGroupId = proposedConnection.getGroupIdentifier();
if (!Objects.equals(proposedGroupId, connectionGroupVersionedComponentId)) {
return true;
}
return false;
}
private Funnel getTemporaryFunnel(final ProcessGroup group) {
final String tempFunnelId = group.getIdentifier() + TEMP_FUNNEL_ID_SUFFIX;
Funnel temporaryFunnel = context.getFlowManager().getFunnel(tempFunnelId);

View File

@ -4075,7 +4075,8 @@ public final class StandardProcessGroup implements ProcessGroup {
final VersionControlInformation versionControlInfo = getVersionControlInformation();
if (versionControlInfo != null) {
if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getMetadata().getFlowIdentifier())) {
throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with. Currently synced to " +
"flow with ID " + versionControlInfo.getFlowIdentifier() + " but proposed flow's metadata shows flow identifier as " + updatedFlow.getMetadata().getFlowIdentifier());
}
if (verifyNotDirty) {

View File

@ -58,6 +58,7 @@ public class FlowDifferenceFilters {
public static boolean isEnvironmentalChange(final FlowDifference difference, final VersionedProcessGroup localGroup, final FlowManager flowManager) {
return difference.getDifferenceType() == DifferenceType.BUNDLE_CHANGED
|| isVariableValueChange(difference)
|| isSensitivePropertyDueToGhosting(difference, flowManager)
|| isAncestorVariableAdded(difference, flowManager)
|| isRpgUrlChange(difference)
|| isAddedOrRemovedRemotePort(difference)
@ -75,6 +76,43 @@ public class FlowDifferenceFilters {
|| isParameterContextChange(difference);
}
private static boolean isSensitivePropertyDueToGhosting(final FlowDifference difference, final FlowManager flowManager) {
if (difference.getDifferenceType() != DifferenceType.PROPERTY_SENSITIVITY_CHANGED) {
return false;
}
final String componentAId = difference.getComponentA().getInstanceIdentifier();
if (componentAId != null) {
final ComponentNode componentNode = getComponent(flowManager, difference.getComponentA().getComponentType(), componentAId);
if (componentNode != null && componentNode.isExtensionMissing()) {
return true;
}
}
final String componentBId = difference.getComponentB().getInstanceIdentifier();
if (componentBId != null) {
final ComponentNode componentNode = getComponent(flowManager, difference.getComponentA().getComponentType(), componentBId);
if (componentNode != null && componentNode.isExtensionMissing()) {
return true;
}
}
return false;
}
private static ComponentNode getComponent(final FlowManager flowManager, final ComponentType componentType, final String componentId) {
switch (componentType) {
case CONTROLLER_SERVICE:
return flowManager.getControllerServiceNode(componentId);
case PROCESSOR:
return flowManager.getProcessorNode(componentId);
case REPORTING_TASK:
return flowManager.getReportingTaskNode(componentId);
}
return null;
}
// The Registry URL may change if, for instance, a registry is moved to a new host, or is made secure, the port changes, etc.
// Since this can be handled by the client anyway, there's no need to flag this as a 'local modification'
private static boolean isRegistryUrlChange(final FlowDifference difference) {

View File

@ -28,10 +28,10 @@ import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.ResumeFlowException;
@ -53,6 +53,7 @@ import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupDescriptorEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.util.AffectedComponentUtils;
import org.apache.nifi.web.util.CancellableTimedPause;
import org.apache.nifi.web.util.ComponentLifecycle;
@ -375,47 +376,32 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
}
asyncRequest.markStepComplete();
// Get the Original Flow Snapshot in case we fail to update and need to rollback
final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(groupId);
final RegisteredFlowSnapshot originalFlowSnapshot = serviceFacade.getVersionedFlowSnapshot(vciEntity.getVersionControlInformation(), true);
try {
if (replicateRequest) {
// If replicating request, steps 9-11 are performed on each node individually
final URI replicateUri = buildUri(requestUri, replicateUriPath, null);
final NiFiUser user = NiFiUserUtils.getNiFiUser();
URI replicateUri = null;
try {
replicateUri = new URI(requestUri.getScheme(), requestUri.getUserInfo(), requestUri.getHost(), requestUri.getPort(),
replicateUriPath, null, requestUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final Map<String, String> headers = new HashMap<>();
headers.put("content-type", MediaType.APPLICATION_JSON);
// each concrete class creates its own type of entity for replication
final Entity replicateEntity = createReplicateUpdateFlowEntity(revision, requestEntity, flowSnapshot);
final NodeResponse clusterResponse;
try {
logger.debug("Replicating PUT request to {} for user {}", replicateUri, user);
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
final NodeResponse clusterResponse = replicateFlowUpdateRequest(replicateUri, user, requestEntity, revision, flowSnapshot);
verifyResponseCode(clusterResponse, replicateUri, user, "update");
} catch (final Exception e) {
if (originalFlowSnapshot == null) {
logger.debug("Failed to update flow but could not determine original flow to rollback to so will not make any attempt to revert the flow.");
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
try {
final NodeResponse rollbackResponse = replicateFlowUpdateRequest(replicateUri, user, requestEntity, revision, originalFlowSnapshot);
verifyResponseCode(rollbackResponse, replicateUri, user, "rollback");
} catch (final Exception inner) {
e.addSuppressed(inner);
}
}
} catch (final InterruptedException ie) {
logger.warn("Interrupted while replicating PUT request to {} for user {}", replicateUri, user);
Thread.currentThread().interrupt();
throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
}
final int updateFlowStatus = clusterResponse.getStatus();
if (updateFlowStatus != Status.OK.getStatusCode()) {
final String explanation = getResponseEntity(clusterResponse, String.class);
logger.error("Failed to update flow across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}",
replicateUri, user, updateFlowStatus, explanation);
throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation);
throw e;
}
} else {
// Step 9: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot,
@ -425,7 +411,27 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
// Step 10-11. Update Process Group to the new flow and update variable registry with any Variables that were added or removed.
// Each concrete class defines its own update flow functionality
performUpdateFlow(groupId, revision, requestEntity, flowSnapshot, idGenerationSeed, !allowDirtyFlowUpdate, true);
try {
performUpdateFlow(groupId, revision, requestEntity, flowSnapshot, idGenerationSeed, !allowDirtyFlowUpdate, true);
} catch (final Exception e) {
// If clustered, just throw the original Exception.
// Otherwise, rollback the flow update. We do not perform the rollback if clustered because
// we want this to be handled at a higher level, allowing the request to replace our flow version to come from the coordinator
// if any node fails to perform the update.
if (isClustered()) {
throw e;
}
// Rollback the update to the original flow snapshot. If there's any Exception, add it as a Suppressed Exception to the original so
// that it can be logged but not overtake the original Exception as the cause.
try {
performUpdateFlow(groupId, revision, requestEntity, originalFlowSnapshot, idGenerationSeed, false, true);
} catch (final Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
}
} finally {
if (!asyncRequest.isCancelled()) {
@ -530,6 +536,53 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
asyncRequest.setCancelCallback(null);
}
private URI buildUri(final URI requestUri, final String path, final String query) {
try {
return new URI(requestUri.getScheme(), requestUri.getUserInfo(), requestUri.getHost(), requestUri.getPort(),
path, query, requestUri.getFragment());
} catch (final URISyntaxException e) {
throw new RuntimeException(e);
}
}
private void verifyResponseCode(final NodeResponse response, final URI uri, final NiFiUser user, final String actionDescription) throws LifecycleManagementException {
final int updateFlowStatus = response.getStatus();
if (updateFlowStatus != Status.OK.getStatusCode()) {
final String explanation = getResponseEntity(response, String.class);
logger.error("Failed to {} flow update across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}",
actionDescription, uri, user, updateFlowStatus, explanation);
throw new LifecycleManagementException("Failed to " + actionDescription + " flow on all nodes in cluster due to " + explanation);
}
}
private NodeResponse replicateFlowUpdateRequest(final URI replicateUri, final NiFiUser user, final T requestEntity, final Revision revision, final RegisteredFlowSnapshot flowSnapshot)
throws LifecycleManagementException {
final Map<String, String> headers = new HashMap<>();
headers.put("content-type", MediaType.APPLICATION_JSON);
// each concrete class creates its own type of entity for replication
final Entity replicateEntity = createReplicateUpdateFlowEntity(revision, requestEntity, flowSnapshot);
final NodeResponse clusterResponse;
try {
logger.debug("Replicating PUT request to {} for user {}", replicateUri, user);
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
}
} catch (final InterruptedException ie) {
logger.warn("Interrupted while replicating PUT request to {} for user {}", replicateUri, user);
Thread.currentThread().interrupt();
throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
}
return clusterResponse;
}
/**
* Get a list of steps to perform for upload flow
*/

View File

@ -48,6 +48,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@ -209,13 +210,59 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
final File versionDir = new File(flowDir, String.valueOf(version));
final File snapshotFile = new File(versionDir, "snapshot.json");
final Pattern intPattern = Pattern.compile("\\d+");
final File[] versionFiles = flowDir.listFiles(file -> intPattern.matcher(file.getName()).matches());
final JsonFactory factory = new JsonFactory(objectMapper);
try (final JsonParser parser = factory.createParser(snapshotFile)) {
final RegisteredFlowSnapshot snapshot = parser.readValueAs(RegisteredFlowSnapshot.class);
populateBucket(snapshot, bucketId);
populateFlow(snapshot, bucketId, flowId, version, versionFiles == null ? 0 : versionFiles.length);
return snapshot;
}
}
private void populateBucket(final RegisteredFlowSnapshot snapshot, final String bucketId) {
final FlowRegistryBucket existingBucket = snapshot.getBucket();
if (existingBucket != null) {
return;
}
final FlowRegistryBucket bucket = new FlowRegistryBucket();
bucket.setCreatedTimestamp(System.currentTimeMillis());
bucket.setIdentifier(bucketId);
bucket.setName(bucketId);
bucket.setPermissions(createAllowAllPermissions());
snapshot.setBucket(bucket);
snapshot.getSnapshotMetadata().setBucketIdentifier(bucketId);
}
private void populateFlow(final RegisteredFlowSnapshot snapshot, final String bucketId, final String flowId, final int version, final int numVersions) {
final RegisteredFlow existingFlow = snapshot.getFlow();
if (existingFlow != null) {
return;
}
final RegisteredFlow flow = new RegisteredFlow();
flow.setCreatedTimestamp(System.currentTimeMillis());
flow.setLastModifiedTimestamp(System.currentTimeMillis());
flow.setBucketIdentifier(bucketId);
flow.setBucketName(bucketId);
flow.setIdentifier(flowId);
flow.setName(flowId);
flow.setPermissions(createAllowAllPermissions());
flow.setVersionCount(numVersions);
final RegisteredFlowVersionInfo versionInfo = new RegisteredFlowVersionInfo();
versionInfo.setVersion(version);
flow.setVersionInfo(versionInfo);
snapshot.setFlow(flow);
snapshot.getSnapshotMetadata().setFlowIdentifier(flowId);
}
@Override
public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot) throws IOException {
final File rootDir = getRootDirectory(context);

View File

@ -38,6 +38,7 @@ import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
import org.apache.nifi.web.api.dto.DifferenceDTO;
import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.FlowRegistryClientDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
@ -75,6 +76,7 @@ import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
import org.apache.nifi.web.api.entity.CountersEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
@ -1652,6 +1654,74 @@ public class NiFiClientUtil {
return nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, groupEntity);
}
public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final int version) throws NiFiClientException, IOException, InterruptedException {
final ProcessGroupEntity groupEntity = nifiClient.getProcessGroupClient().getProcessGroup(processGroupId);
final ProcessGroupDTO groupDto = groupEntity.getComponent();
final VersionControlInformationDTO vciDto = groupDto.getVersionControlInformation();
if (vciDto == null) {
throw new IllegalArgumentException("Process Group with ID " + processGroupId + " is not under Version Control");
}
vciDto.setVersion(version);
final VersionControlInformationEntity requestEntity = new VersionControlInformationEntity();
requestEntity.setProcessGroupRevision(groupEntity.getRevision());
requestEntity.setVersionControlInformation(vciDto);
final VersionedFlowUpdateRequestEntity result = nifiClient.getVersionsClient().updateVersionControlInfo(processGroupId, requestEntity);
return waitForVersionFlowUpdateComplete(result.getRequest().getRequestId());
}
public VersionedFlowUpdateRequestEntity waitForVersionFlowUpdateComplete(final String updateRequestId) throws NiFiClientException, IOException, InterruptedException {
while (true) {
final VersionedFlowUpdateRequestEntity result = nifiClient.getVersionsClient().getUpdateRequest(updateRequestId);
final boolean complete = result.getRequest().isComplete();
if (complete) {
return nifiClient.getVersionsClient().deleteUpdateRequest(updateRequestId);
}
logger.debug("Waiting for Version Flow Update request to complete...");
Thread.sleep(100L);
}
}
public void assertFlowStaleAndUnmodified(final String processGroupId) throws NiFiClientException, IOException {
final String state = nifiClient.getProcessGroupClient().getProcessGroup(processGroupId).getVersionedFlowState();
if ("STALE".equalsIgnoreCase(state)) {
return;
}
if ("LOCALLY_MODIFIED_AND_STALE".equalsIgnoreCase(state)) {
final FlowComparisonEntity flowComparisonEntity = nifiClient.getProcessGroupClient().getLocalModifications(processGroupId);
final String differences = flowComparisonEntity.getComponentDifferences().stream()
.flatMap(dto -> dto.getDifferences().stream())
.map(DifferenceDTO::getDifference)
.collect(Collectors.joining("\n"));
throw new AssertionError("Expected state to be STALE but was " + state + " with the following modifications:\n" + differences);
}
throw new AssertionError("Expected state to be STALE but was " + state);
}
public void assertFlowUpToDate(final String processGroupId) throws NiFiClientException, IOException {
final String state = nifiClient.getProcessGroupClient().getProcessGroup(processGroupId).getVersionedFlowState();
if ("UP_TO_DATE".equalsIgnoreCase(state)) {
return;
}
if ("LOCALLY_MODIFIED".equalsIgnoreCase(state)) {
final FlowComparisonEntity flowComparisonEntity = nifiClient.getProcessGroupClient().getLocalModifications(processGroupId);
final String differences = flowComparisonEntity.getComponentDifferences().stream()
.flatMap(dto -> dto.getDifferences().stream())
.map(DifferenceDTO::getDifference)
.collect(Collectors.joining("\n"));
throw new AssertionError("Expected state to be UP_TO_DATE but was LOCALLY_MODIFIED with the following modifications:\n" + differences);
}
throw new AssertionError("Expected state to be UP_TO_DATE but was " + state);
}
public FlowEntity copyAndPaste(final ProcessGroupEntity pgEntity, final String destinationGroupId) throws NiFiClientException, IOException {
final SnippetDTO snippetDto = new SnippetDTO();
snippetDto.setProcessGroups(Collections.singletonMap(pgEntity.getId(), pgEntity.getRevision()));

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.registry;
import org.apache.nifi.tests.system.NiFiInstanceFactory;
public class ClusteredRegistryClientIT extends RegistryClientIT {
@Override
public NiFiInstanceFactory getInstanceFactory() {
return createTwoNodeInstanceFactory();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.nifi.tests.system.registry;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
@ -30,6 +31,7 @@ import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -37,14 +39,87 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RegistryClientIT extends NiFiSystemIT {
@Test
public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), "test-flows", "port-moved-groups", 1);
assertNotNull(imported);
getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
// Ensure that the import worked as expected
final FlowSnippetDTO groupContents = imported.getComponent().getContents();
final List<ProcessorDTO> replaceTextProcessors = groupContents.getProcessors().stream()
.filter(proc -> proc.getName().equals("ReplaceText"))
.collect(Collectors.toList());
assertEquals(1, replaceTextProcessors.size());
assertTrue(groupContents.getInputPorts().isEmpty());
// Change to version 2
final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2);
assertNull(version2Result.getRequest().getFailureReason());
final FlowDTO v2Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow();
getClientUtil().assertFlowUpToDate(imported.getId());
// Ensure that the ReplaceText processor still exists
final long replaceTextCount = v2Contents.getProcessors().stream()
.map(ProcessorEntity::getComponent)
.filter(proc -> proc.getName().equals("ReplaceText"))
.filter(proc -> proc.getId().equals(replaceTextProcessors.get(0).getId()))
.count();
assertEquals(1, replaceTextCount);
// Ensure that we now have a Port at the top level
assertEquals(1, v2Contents.getInputPorts().size());
// Change back to Version 1
final VersionedFlowUpdateRequestEntity changeBackToV1Result = getClientUtil().changeFlowVersion(imported.getId(), 1);
assertNull(changeBackToV1Result.getRequest().getFailureReason());
final FlowDTO v1Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow();
getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
// Ensure that we no longer have a Port at the top level
assertTrue(v1Contents.getInputPorts().isEmpty());
}
@Test
public void testRollbackOnFailure() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), "test-flows", "flow-with-invalid-connection", 1);
assertNotNull(imported);
getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2);
final String failureReason = version2Result.getRequest().getFailureReason();
assertNotNull(failureReason);
// Ensure that we're still on v1 of the flow and there are no local modifications
getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
// Ensure that the processors still exist
final FlowDTO contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow();
assertEquals(1, contents.getProcessors().size());
}
@Test
public void testStartVersionControlThenImport() throws NiFiClientException, IOException {
final FlowRegistryClientEntity clientEntity = registerClient();
@ -68,10 +143,15 @@ public class RegistryClientIT extends NiFiSystemIT {
}
private FlowRegistryClientEntity registerClient() throws NiFiClientException, IOException {
final String clientName = String.format("FileRegistry-%s", UUID.randomUUID());
final FlowRegistryClientEntity clientEntity = getClientUtil().createFlowRegistryClient(clientName);
final File storageDir = new File("target/flowRegistryStorage/" + getTestName().replace("\\(.*?\\)", ""));
Files.createDirectories(storageDir.toPath());
return registerClient(storageDir);
}
private FlowRegistryClientEntity registerClient(final File storageDir) throws NiFiClientException, IOException {
final String clientName = String.format("FileRegistry-%s", UUID.randomUUID());
final FlowRegistryClientEntity clientEntity = getClientUtil().createFlowRegistryClient(clientName);
getClientUtil().updateRegistryClientProperties(clientEntity, Collections.singletonMap("Directory", storageDir.getAbsolutePath()));
return clientEntity;

View File

@ -27,7 +27,7 @@ java.arg.3=-Xmx512m
java.arg.14=-Djava.awt.headless=true
#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8002
java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
# Disable Logback web shutdown hook using System property
java.arg.logbackShutdown=-DlogbackDisableServletContainerInitializer=true

View File

@ -113,6 +113,7 @@
<logger name="org.apache.nifi.controller.reporting.LogComponentStatuses" level="ERROR" />
<logger name="org.apache.calcite.runtime.CalciteException" level="OFF" />
<logger name="deprecation" level="OFF" />
<logger name="org.apache.curator.framework.recipes.leader.LeaderSelector" level="OFF" />
<logger name="org.apache.curator.ConnectionState" level="OFF" />

View File

@ -248,4 +248,4 @@ nifi.kerberos.spnego.authentication.expiration=12 hours
# external properties files for variable registry
# supports a comma delimited list of file locations
nifi.variable.registry.properties=
nifi.variable.registry.properties=

View File

@ -0,0 +1,305 @@
{
"externalControllerServices": {},
"flowContents": {
"comments": "",
"componentType": "PROCESS_GROUP",
"connections": [
{
"backPressureDataSizeThreshold": "1 GB",
"backPressureObjectThreshold": 10000,
"bends": [],
"componentType": "CONNECTION",
"destination": {
"groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
"instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
"name": "in",
"type": "INPUT_PORT"
},
"flowFileExpiration": "0 sec",
"groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
"identifier": "ff659054-0dee-3d85-b88f-6a4fc53639bd",
"instanceIdentifier": "5bd1fbc3-0186-1000-ec2a-0600eb0f67c9",
"labelIndex": 1,
"loadBalanceCompression": "DO_NOT_COMPRESS",
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"name": "",
"partitioningAttribute": "",
"prioritizers": [],
"selectedRelationships": [
"failure",
"success"
],
"source": {
"comments": "",
"groupId": "30330c01-185a-3217-b764-39aef8d2a05f",
"id": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
"instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
"name": "ReplaceText",
"type": "PROCESSOR"
},
"zIndex": 0
}
],
"controllerServices": [],
"defaultBackPressureDataSizeThreshold": "1 GB",
"defaultBackPressureObjectThreshold": 10000,
"defaultFlowFileExpiration": "0 sec",
"flowFileConcurrency": "UNBOUNDED",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"funnels": [],
"identifier": "30330c01-185a-3217-b764-39aef8d2a05f",
"inputPorts": [],
"instanceIdentifier": "5bd17e2a-0186-1000-48b9-aa19b73d5a66",
"labels": [],
"name": "parent",
"outputPorts": [],
"position": {
"x": 614.0,
"y": 462.0
},
"processGroups": [
{
"comments": "",
"componentType": "PROCESS_GROUP",
"connections": [
{
"backPressureDataSizeThreshold": "1 GB",
"backPressureObjectThreshold": 10000,
"bends": [],
"componentType": "CONNECTION",
"destination": {
"comments": "",
"groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"id": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
"instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
"name": "UpdateAttribute",
"type": "PROCESSOR"
},
"flowFileExpiration": "0 sec",
"groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"identifier": "776f1f03-e742-3287-841b-8aedb9ada176",
"instanceIdentifier": "5bd1bdd3-0186-1000-dc09-4c6d9bffa220",
"labelIndex": 1,
"loadBalanceCompression": "DO_NOT_COMPRESS",
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"name": "",
"partitioningAttribute": "",
"prioritizers": [],
"selectedRelationships": [
""
],
"source": {
"groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
"instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
"name": "in",
"type": "INPUT_PORT"
},
"zIndex": 0
}
],
"controllerServices": [],
"defaultBackPressureDataSizeThreshold": "1 GB",
"defaultBackPressureObjectThreshold": 10000,
"defaultFlowFileExpiration": "0 sec",
"flowFileConcurrency": "UNBOUNDED",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"funnels": [],
"groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
"identifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"inputPorts": [
{
"allowRemoteAccess": false,
"componentType": "INPUT_PORT",
"concurrentlySchedulableTaskCount": 1,
"groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"identifier": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
"instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
"name": "in",
"position": {
"x": 784.0,
"y": 146.0
},
"scheduledState": "ENABLED",
"type": "INPUT_PORT"
}
],
"instanceIdentifier": "5bd1937d-0186-1000-eba8-f831e822907c",
"labels": [],
"name": "child",
"outputPorts": [],
"position": {
"x": 824.0,
"y": 648.0
},
"processGroups": [],
"processors": [
{
"autoTerminatedRelationships": [],
"backoffMechanism": "PENALIZE_FLOWFILE",
"bulletinLevel": "WARN",
"bundle": {
"artifact": "nifi-update-attribute-nar",
"group": "org.apache.nifi",
"version": "1.16.0.2.1.4.0-53"
},
"comments": "",
"componentType": "PROCESSOR",
"concurrentlySchedulableTaskCount": 1,
"executionNode": "ALL",
"groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"identifier": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
"instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
"maxBackoffPeriod": "10 mins",
"name": "UpdateAttribute",
"penaltyDuration": "30 sec",
"position": {
"x": 735.0,
"y": 310.0
},
"properties": {
"Store State": "Do not store state",
"canonical-value-lookup-cache-size": "100"
},
"propertyDescriptors": {
"Delete Attributes Expression": {
"displayName": "Delete Attributes Expression",
"identifiesControllerService": false,
"name": "Delete Attributes Expression",
"sensitive": false
},
"Store State": {
"displayName": "Store State",
"identifiesControllerService": false,
"name": "Store State",
"sensitive": false
},
"canonical-value-lookup-cache-size": {
"displayName": "Cache Value Lookup Cache Size",
"identifiesControllerService": false,
"name": "canonical-value-lookup-cache-size",
"sensitive": false
},
"Stateful Variables Initial Value": {
"displayName": "Stateful Variables Initial Value",
"identifiesControllerService": false,
"name": "Stateful Variables Initial Value",
"sensitive": false
}
},
"retriedRelationships": [],
"retryCount": 10,
"runDurationMillis": 0,
"scheduledState": "ENABLED",
"schedulingPeriod": "0 sec",
"schedulingStrategy": "TIMER_DRIVEN",
"style": {},
"type": "org.apache.nifi.processors.attributes.UpdateAttribute",
"yieldDuration": "1 sec"
}
],
"remoteProcessGroups": [],
"variables": {}
}
],
"processors": [
{
"autoTerminatedRelationships": [],
"backoffMechanism": "PENALIZE_FLOWFILE",
"bulletinLevel": "WARN",
"bundle": {
"artifact": "nifi-standard-nar",
"group": "org.apache.nifi",
"version": "1.16.0.2.1.4.0-53"
},
"comments": "",
"componentType": "PROCESSOR",
"concurrentlySchedulableTaskCount": 1,
"executionNode": "ALL",
"groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
"identifier": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
"instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
"maxBackoffPeriod": "10 mins",
"name": "ReplaceText",
"penaltyDuration": "30 sec",
"position": {
"x": 848.0,
"y": 384.0
},
"properties": {
"Regular Expression": "(?s)(^.*$)",
"Replacement Value": "$1",
"Evaluation Mode": "Line-by-Line",
"Line-by-Line Evaluation Mode": "All",
"Character Set": "UTF-8",
"Maximum Buffer Size": "1 MB",
"Replacement Strategy": "Regex Replace"
},
"propertyDescriptors": {
"Regular Expression": {
"displayName": "Search Value",
"identifiesControllerService": false,
"name": "Regular Expression",
"sensitive": false
},
"Replacement Value": {
"displayName": "Replacement Value",
"identifiesControllerService": false,
"name": "Replacement Value",
"sensitive": false
},
"Evaluation Mode": {
"displayName": "Evaluation Mode",
"identifiesControllerService": false,
"name": "Evaluation Mode",
"sensitive": false
},
"Line-by-Line Evaluation Mode": {
"displayName": "Line-by-Line Evaluation Mode",
"identifiesControllerService": false,
"name": "Line-by-Line Evaluation Mode",
"sensitive": false
},
"Character Set": {
"displayName": "Character Set",
"identifiesControllerService": false,
"name": "Character Set",
"sensitive": false
},
"Maximum Buffer Size": {
"displayName": "Maximum Buffer Size",
"identifiesControllerService": false,
"name": "Maximum Buffer Size",
"sensitive": false
},
"Replacement Strategy": {
"displayName": "Replacement Strategy",
"identifiesControllerService": false,
"name": "Replacement Strategy",
"sensitive": false
}
},
"retriedRelationships": [],
"retryCount": 10,
"runDurationMillis": 0,
"scheduledState": "ENABLED",
"schedulingPeriod": "0 sec",
"schedulingStrategy": "TIMER_DRIVEN",
"style": {},
"type": "org.apache.nifi.processors.standard.ReplaceText",
"yieldDuration": "1 sec"
}
],
"remoteProcessGroups": [],
"variables": {}
},
"flowEncodingVersion": "1.0",
"parameterContexts": {},
"snapshotMetadata": {
"author": "anonymous",
"comments": "",
"timestamp": 1676577758403,
"version": 1
}
}

View File

@ -0,0 +1,156 @@
{
"externalControllerServices": {},
"flowContents": {
"comments": "",
"componentType": "PROCESS_GROUP",
"connections": [
{
"backPressureDataSizeThreshold": "1 GB",
"backPressureObjectThreshold": 10000,
"bends": [],
"componentType": "CONNECTION",
"destination": {
"groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
"instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
"name": "in",
"type": "INPUT_PORT"
},
"flowFileExpiration": "0 sec",
"groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
"identifier": "ff659054-0dee-3d85-b88f-6a4fc53639bd",
"instanceIdentifier": "5bd1fbc3-0186-1000-ec2a-0600eb0f67c9",
"labelIndex": 1,
"loadBalanceCompression": "DO_NOT_COMPRESS",
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"name": "",
"partitioningAttribute": "",
"prioritizers": [],
"selectedRelationships": [
"failure",
"success"
],
"source": {
"comments": "",
"groupId": "30330c01-185a-3217-b764-39aef8d2a05f",
"id": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
"instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
"name": "ReplaceText",
"type": "PROCESSOR"
},
"zIndex": 0
}
],
"controllerServices": [],
"defaultBackPressureDataSizeThreshold": "1 GB",
"defaultBackPressureObjectThreshold": 10000,
"defaultFlowFileExpiration": "0 sec",
"flowFileConcurrency": "UNBOUNDED",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"funnels": [],
"identifier": "30330c01-185a-3217-b764-39aef8d2a05f",
"inputPorts": [],
"instanceIdentifier": "5bd17e2a-0186-1000-48b9-aa19b73d5a66",
"labels": [],
"name": "parent",
"outputPorts": [],
"position": {
"x": 614.0,
"y": 462.0
},
"processGroups": [
{
"comments": "",
"componentType": "PROCESS_GROUP",
"connections": [
{
"backPressureDataSizeThreshold": "1 GB",
"backPressureObjectThreshold": 10000,
"bends": [],
"componentType": "CONNECTION",
"destination": {
"comments": "",
"groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"id": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
"instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
"name": "UpdateAttribute",
"type": "PROCESSOR"
},
"flowFileExpiration": "0 sec",
"groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"identifier": "776f1f03-e742-3287-841b-8aedb9ada176",
"instanceIdentifier": "5bd1bdd3-0186-1000-dc09-4c6d9bffa220",
"labelIndex": 1,
"loadBalanceCompression": "DO_NOT_COMPRESS",
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"name": "",
"partitioningAttribute": "",
"prioritizers": [],
"selectedRelationships": [
""
],
"source": {
"groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
"instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
"name": "in",
"type": "INPUT_PORT"
},
"zIndex": 0
}
],
"controllerServices": [],
"defaultBackPressureDataSizeThreshold": "1 GB",
"defaultBackPressureObjectThreshold": 10000,
"defaultFlowFileExpiration": "0 sec",
"flowFileConcurrency": "UNBOUNDED",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"funnels": [],
"groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
"identifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"inputPorts": [
{
"allowRemoteAccess": false,
"componentType": "INPUT_PORT",
"concurrentlySchedulableTaskCount": 1,
"groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"identifier": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
"instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
"name": "in",
"position": {
"x": 784.0,
"y": 146.0
},
"scheduledState": "ENABLED",
"type": "INPUT_PORT"
}
],
"instanceIdentifier": "5bd1937d-0186-1000-eba8-f831e822907c",
"labels": [],
"name": "child",
"outputPorts": [],
"position": {
"x": 824.0,
"y": 648.0
},
"processGroups": [],
"processors": [
],
"remoteProcessGroups": [],
"variables": {}
}
],
"processors": [
],
"remoteProcessGroups": [],
"variables": {}
},
"flowEncodingVersion": "1.0",
"parameterContexts": {},
"snapshotMetadata": {
"author": "anonymous",
"comments": "",
"timestamp": 1676577758403,
"version": 1
}
}

View File

@ -0,0 +1,305 @@
{
"externalControllerServices": {},
"flowContents": {
"comments": "",
"componentType": "PROCESS_GROUP",
"connections": [
{
"backPressureDataSizeThreshold": "1 GB",
"backPressureObjectThreshold": 10000,
"bends": [],
"componentType": "CONNECTION",
"destination": {
"groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
"instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
"name": "in",
"type": "INPUT_PORT"
},
"flowFileExpiration": "0 sec",
"groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
"identifier": "ff659054-0dee-3d85-b88f-6a4fc53639bd",
"instanceIdentifier": "5bd1fbc3-0186-1000-ec2a-0600eb0f67c9",
"labelIndex": 1,
"loadBalanceCompression": "DO_NOT_COMPRESS",
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"name": "",
"partitioningAttribute": "",
"prioritizers": [],
"selectedRelationships": [
"failure",
"success"
],
"source": {
"comments": "",
"groupId": "30330c01-185a-3217-b764-39aef8d2a05f",
"id": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
"instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
"name": "ReplaceText",
"type": "PROCESSOR"
},
"zIndex": 0
}
],
"controllerServices": [],
"defaultBackPressureDataSizeThreshold": "1 GB",
"defaultBackPressureObjectThreshold": 10000,
"defaultFlowFileExpiration": "0 sec",
"flowFileConcurrency": "UNBOUNDED",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"funnels": [],
"identifier": "30330c01-185a-3217-b764-39aef8d2a05f",
"inputPorts": [],
"instanceIdentifier": "5bd17e2a-0186-1000-48b9-aa19b73d5a66",
"labels": [],
"name": "parent",
"outputPorts": [],
"position": {
"x": 614.0,
"y": 462.0
},
"processGroups": [
{
"comments": "",
"componentType": "PROCESS_GROUP",
"connections": [
{
"backPressureDataSizeThreshold": "1 GB",
"backPressureObjectThreshold": 10000,
"bends": [],
"componentType": "CONNECTION",
"destination": {
"comments": "",
"groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"id": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
"instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
"name": "UpdateAttribute",
"type": "PROCESSOR"
},
"flowFileExpiration": "0 sec",
"groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"identifier": "776f1f03-e742-3287-841b-8aedb9ada176",
"instanceIdentifier": "5bd1bdd3-0186-1000-dc09-4c6d9bffa220",
"labelIndex": 1,
"loadBalanceCompression": "DO_NOT_COMPRESS",
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"name": "",
"partitioningAttribute": "",
"prioritizers": [],
"selectedRelationships": [
""
],
"source": {
"groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
"instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
"name": "in",
"type": "INPUT_PORT"
},
"zIndex": 0
}
],
"controllerServices": [],
"defaultBackPressureDataSizeThreshold": "1 GB",
"defaultBackPressureObjectThreshold": 10000,
"defaultFlowFileExpiration": "0 sec",
"flowFileConcurrency": "UNBOUNDED",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"funnels": [],
"groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
"identifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"inputPorts": [
{
"allowRemoteAccess": false,
"componentType": "INPUT_PORT",
"concurrentlySchedulableTaskCount": 1,
"groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"identifier": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
"instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
"name": "in",
"position": {
"x": 784.0,
"y": 146.0
},
"scheduledState": "ENABLED",
"type": "INPUT_PORT"
}
],
"instanceIdentifier": "5bd1937d-0186-1000-eba8-f831e822907c",
"labels": [],
"name": "child",
"outputPorts": [],
"position": {
"x": 824.0,
"y": 648.0
},
"processGroups": [],
"processors": [
{
"autoTerminatedRelationships": [],
"backoffMechanism": "PENALIZE_FLOWFILE",
"bulletinLevel": "WARN",
"bundle": {
"artifact": "nifi-update-attribute-nar",
"group": "org.apache.nifi",
"version": "1.16.0.2.1.4.0-53"
},
"comments": "",
"componentType": "PROCESSOR",
"concurrentlySchedulableTaskCount": 1,
"executionNode": "ALL",
"groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
"identifier": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
"instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
"maxBackoffPeriod": "10 mins",
"name": "UpdateAttribute",
"penaltyDuration": "30 sec",
"position": {
"x": 735.0,
"y": 310.0
},
"properties": {
"Store State": "Do not store state",
"canonical-value-lookup-cache-size": "100"
},
"propertyDescriptors": {
"Delete Attributes Expression": {
"displayName": "Delete Attributes Expression",
"identifiesControllerService": false,
"name": "Delete Attributes Expression",
"sensitive": false
},
"Store State": {
"displayName": "Store State",
"identifiesControllerService": false,
"name": "Store State",
"sensitive": false
},
"canonical-value-lookup-cache-size": {
"displayName": "Cache Value Lookup Cache Size",
"identifiesControllerService": false,
"name": "canonical-value-lookup-cache-size",
"sensitive": false
},
"Stateful Variables Initial Value": {
"displayName": "Stateful Variables Initial Value",
"identifiesControllerService": false,
"name": "Stateful Variables Initial Value",
"sensitive": false
}
},
"retriedRelationships": [],
"retryCount": 10,
"runDurationMillis": 0,
"scheduledState": "ENABLED",
"schedulingPeriod": "0 sec",
"schedulingStrategy": "TIMER_DRIVEN",
"style": {},
"type": "org.apache.nifi.processors.attributes.UpdateAttribute",
"yieldDuration": "1 sec"
}
],
"remoteProcessGroups": [],
"variables": {}
}
],
"processors": [
{
"autoTerminatedRelationships": [],
"backoffMechanism": "PENALIZE_FLOWFILE",
"bulletinLevel": "WARN",
"bundle": {
"artifact": "nifi-standard-nar",
"group": "org.apache.nifi",
"version": "1.16.0.2.1.4.0-53"
},
"comments": "",
"componentType": "PROCESSOR",
"concurrentlySchedulableTaskCount": 1,
"executionNode": "ALL",
"groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
"identifier": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
"instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
"maxBackoffPeriod": "10 mins",
"name": "ReplaceText",
"penaltyDuration": "30 sec",
"position": {
"x": 848.0,
"y": 384.0
},
"properties": {
"Regular Expression": "(?s)(^.*$)",
"Replacement Value": "$1",
"Evaluation Mode": "Line-by-Line",
"Line-by-Line Evaluation Mode": "All",
"Character Set": "UTF-8",
"Maximum Buffer Size": "1 MB",
"Replacement Strategy": "Regex Replace"
},
"propertyDescriptors": {
"Regular Expression": {
"displayName": "Search Value",
"identifiesControllerService": false,
"name": "Regular Expression",
"sensitive": false
},
"Replacement Value": {
"displayName": "Replacement Value",
"identifiesControllerService": false,
"name": "Replacement Value",
"sensitive": false
},
"Evaluation Mode": {
"displayName": "Evaluation Mode",
"identifiesControllerService": false,
"name": "Evaluation Mode",
"sensitive": false
},
"Line-by-Line Evaluation Mode": {
"displayName": "Line-by-Line Evaluation Mode",
"identifiesControllerService": false,
"name": "Line-by-Line Evaluation Mode",
"sensitive": false
},
"Character Set": {
"displayName": "Character Set",
"identifiesControllerService": false,
"name": "Character Set",
"sensitive": false
},
"Maximum Buffer Size": {
"displayName": "Maximum Buffer Size",
"identifiesControllerService": false,
"name": "Maximum Buffer Size",
"sensitive": false
},
"Replacement Strategy": {
"displayName": "Replacement Strategy",
"identifiesControllerService": false,
"name": "Replacement Strategy",
"sensitive": false
}
},
"retriedRelationships": [],
"retryCount": 10,
"runDurationMillis": 0,
"scheduledState": "ENABLED",
"schedulingPeriod": "0 sec",
"schedulingStrategy": "TIMER_DRIVEN",
"style": {},
"type": "org.apache.nifi.processors.standard.ReplaceText",
"yieldDuration": "1 sec"
}
],
"remoteProcessGroups": [],
"variables": {}
},
"flowEncodingVersion": "1.0",
"parameterContexts": {},
"snapshotMetadata": {
"author": "anonymous",
"comments": "",
"timestamp": 1676577758403,
"version": 1
}
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
@ -70,4 +71,5 @@ public interface ProcessGroupClient {
FlowEntity copySnippet(String processGroupId, CopySnippetRequestEntity copySnippetRequestEntity)
throws NiFiClientException, IOException;
FlowComparisonEntity getLocalModifications(String processGroupId) throws NiFiClientException, IOException;
}

View File

@ -23,6 +23,7 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
@ -333,4 +334,19 @@ public class JerseyProcessGroupClient extends AbstractJerseyClient implements Pr
});
}
@Override
public FlowComparisonEntity getLocalModifications(final String processGroupId) throws NiFiClientException, IOException {
if (StringUtils.isBlank(processGroupId)) {
throw new IllegalArgumentException("Process group id cannot be null or blank");
}
return executeAction("Error retrieving list of local flow modifications", () -> {
final WebTarget target = processGroupsTarget
.path("{id}/local-modifications")
.resolveTemplate("id", processGroupId);
return getRequestBuilder(target).get(FlowComparisonEntity.class);
});
}
}