NIFI-13073 Changed Flow Version from int to String

- Updated user interfaces to support String version numbers
- Also updated new UI to clear out the buckets and flows when selections change 'above' them in the form when importing from registry

This closes #8674

Co-authored-by: Rob Fellows <rob.fellows@gmail.com>
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Bryan Bende 2024-04-18 14:41:17 -04:00 committed by exceptionfactory
parent 9098c013f2
commit a20c5d9dff
No known key found for this signature in database
51 changed files with 340 additions and 239 deletions

View File

@ -20,7 +20,7 @@ package org.apache.nifi.flow;
public class VersionedExternalFlowMetadata { public class VersionedExternalFlowMetadata {
private String bucketId; private String bucketId;
private String flowId; private String flowId;
private int version; private String version;
private String flowName; private String flowName;
private String author; private String author;
private String comments; private String comments;
@ -42,11 +42,11 @@ public class VersionedExternalFlowMetadata {
this.flowId = flowId; this.flowId = flowId;
} }
public int getVersion() { public String getVersion() {
return version; return version;
} }
public void setVersion(final int version) { public void setVersion(final String version) {
this.version = version; this.version = version;
} }

View File

@ -26,7 +26,7 @@ public class VersionedFlowCoordinates {
private String storageLocation; private String storageLocation;
private String bucketId; private String bucketId;
private String flowId; private String flowId;
private int version; private String version;
private Boolean latest; private Boolean latest;
@Schema(description = "The identifier of the Flow Registry that contains the flow") @Schema(description = "The identifier of the Flow Registry that contains the flow")
@ -66,11 +66,11 @@ public class VersionedFlowCoordinates {
} }
@Schema(description = "The version of the flow") @Schema(description = "The version of the flow")
public int getVersion() { public String getVersion() {
return version; return version;
} }
public void setVersion(int version) { public void setVersion(String version) {
this.version = version; this.version = version;
} }

View File

@ -19,6 +19,7 @@ package org.apache.nifi.registry.flow;
import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.ConfigurableComponent;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;
import java.util.Set; import java.util.Set;
/** /**
@ -165,20 +166,21 @@ public interface FlowRegistryClient extends ConfigurableComponent {
* @throws FlowRegistryException If an issue happens during processing the request. * @throws FlowRegistryException If an issue happens during processing the request.
* @throws IOException If there is issue with the communication between NiFi and the Flow Registry. * @throws IOException If there is issue with the communication between NiFi and the Flow Registry.
*/ */
RegisteredFlowSnapshot getFlowContents(FlowRegistryClientConfigurationContext context, String bucketId, String flowId, int version) throws FlowRegistryException, IOException; RegisteredFlowSnapshot getFlowContents(FlowRegistryClientConfigurationContext context, String bucketId, String flowId, String version) throws FlowRegistryException, IOException;
/** /**
* Adds the given snapshot to the Flow Registry for the given Flow. * Adds the given snapshot to the Flow Registry for the given Flow.
* *
* @param context Configuration context. * @param context Configuration context.
* @param flowSnapshot The flow snapshot to register. * @param flowSnapshot The flow snapshot to register.
* @param action The register action
* *
* @return The flow snapshot. * @return The flow snapshot.
* *
* @throws FlowRegistryException If an issue happens during processing the request. * @throws FlowRegistryException If an issue happens during processing the request.
* @throws IOException If there is issue with the communication between NiFi and the Flow Registry. * @throws IOException If there is issue with the communication between NiFi and the Flow Registry.
*/ */
RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot) throws FlowRegistryException, IOException; RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot, RegisterAction action) throws FlowRegistryException, IOException;
/** /**
* Retrieves the set of all versions of the specified flow. * Retrieves the set of all versions of the specified flow.
@ -199,14 +201,12 @@ public interface FlowRegistryClient extends ConfigurableComponent {
* *
* @param context Configuration context. * @param context Configuration context.
* @param bucketId The id of the bucket. * @param bucketId The id of the bucket.
* @param flowId The id of the flow. The result must be a positive number and the first version expected to have the version id of 1. * @param flowId The id of the flow.
* If by some reason the specified flow has no versions, the result will be 0, signing the lack of versions. 0 is not
* directly assigned to any actual version.
* *
* @return The latest version of the Flow. * @return an Optional containing the latest version of the flow, or empty if no versions exist for the flow
* *
* @throws FlowRegistryException If an issue happens during processing the request. * @throws FlowRegistryException If an issue happens during processing the request.
* @throws IOException If there is issue with the communication between NiFi and the Flow Registry. * @throws IOException If there is issue with the communication between NiFi and the Flow Registry.
*/ */
int getLatestVersion(FlowRegistryClientConfigurationContext context, String bucketId, String flowId) throws FlowRegistryException, IOException; Optional<String> getLatestVersion(FlowRegistryClientConfigurationContext context, String bucketId, String flowId) throws FlowRegistryException, IOException;
} }

View File

@ -0,0 +1,25 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.nifi.registry.flow;
public enum RegisterAction {
COMMIT,
FORCE_COMMIT;
}

View File

@ -32,6 +32,7 @@ public class RegisteredFlowSnapshot {
private Map<String, VersionedParameterContext> parameterContexts; private Map<String, VersionedParameterContext> parameterContexts;
private String flowEncodingVersion; private String flowEncodingVersion;
private Map<String, ParameterProviderReference> parameterProviders; private Map<String, ParameterProviderReference> parameterProviders;
private boolean latest;
public RegisteredFlowSnapshotMetadata getSnapshotMetadata() { public RegisteredFlowSnapshotMetadata getSnapshotMetadata() {
return snapshotMetadata; return snapshotMetadata;
@ -62,7 +63,7 @@ public class RegisteredFlowSnapshot {
} }
public boolean isLatest() { public boolean isLatest() {
return flow != null && snapshotMetadata != null && flow.getVersionCount() == getSnapshotMetadata().getVersion(); return latest;
} }
public void setSnapshotMetadata(final RegisteredFlowSnapshotMetadata snapshotMetadata) { public void setSnapshotMetadata(final RegisteredFlowSnapshotMetadata snapshotMetadata) {
@ -100,4 +101,8 @@ public class RegisteredFlowSnapshot {
public void setParameterProviders(final Map<String, ParameterProviderReference> parameterProviders) { public void setParameterProviders(final Map<String, ParameterProviderReference> parameterProviders) {
this.parameterProviders = parameterProviders; this.parameterProviders = parameterProviders;
} }
public void setLatest(final boolean latest) {
this.latest = latest;
}
} }

View File

@ -19,7 +19,7 @@ package org.apache.nifi.registry.flow;
public class RegisteredFlowSnapshotMetadata { public class RegisteredFlowSnapshotMetadata {
private String bucketIdentifier; private String bucketIdentifier;
private String flowIdentifier; private String flowIdentifier;
private int version; private String version;
private long timestamp; private long timestamp;
private String author; private String author;
private String comments; private String comments;
@ -32,7 +32,7 @@ public class RegisteredFlowSnapshotMetadata {
return flowIdentifier; return flowIdentifier;
} }
public int getVersion() { public String getVersion() {
return version; return version;
} }
@ -56,7 +56,7 @@ public class RegisteredFlowSnapshotMetadata {
this.flowIdentifier = flowIdentifier; this.flowIdentifier = flowIdentifier;
} }
public void setVersion(int version) { public void setVersion(String version) {
this.version = version; this.version = version;
} }

View File

@ -40,6 +40,7 @@ import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -229,45 +230,62 @@ public class NifiRegistryFlowRegistryClient extends AbstractFlowRegistryClient {
@Override @Override
public RegisteredFlowSnapshot getFlowContents( public RegisteredFlowSnapshot getFlowContents(
final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final int version final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final String version
) throws FlowRegistryException, IOException { ) throws FlowRegistryException, IOException {
try { try {
final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(context); final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(context);
final VersionedFlowSnapshot snapshot = snapshotClient.get(bucketId, flowId, version); final VersionedFlowSnapshot snapshot = snapshotClient.get(bucketId, flowId, Integer.parseInt(version));
if (snapshot == null) { if (snapshot == null) {
throw new NoSuchFlowVersionException(String.format("Version %d of flow %s does not exist in bucket %s", version, flowId, bucketId)); throw new NoSuchFlowVersionException(String.format("Version %s of flow %s does not exist in bucket %s", version, flowId, bucketId));
} }
return NifiRegistryUtil.convert(snapshot); final RegisteredFlowSnapshot registeredFlowSnapshot = NifiRegistryUtil.convert(snapshot);
registeredFlowSnapshot.setLatest(snapshot.getSnapshotMetadata().getVersion() == snapshot.getFlow().getVersionCount());
return registeredFlowSnapshot;
} catch (final NiFiRegistryException e) { } catch (final NiFiRegistryException e) {
throw new FlowRegistryException(e.getMessage(), e); throw new FlowRegistryException(e.getMessage(), e);
} }
} }
@Override @Override
public RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot) throws FlowRegistryException, IOException { public RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot, RegisterAction action)
throws FlowRegistryException, IOException {
try { try {
final RegisteredFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
final int snapshotVersion = getRegisteredFlowSnapshotVersion(snapshotMetadata, action);
snapshotMetadata.setVersion(String.valueOf(snapshotVersion));
final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(context); final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(context);
final VersionedFlowSnapshot versionedFlowSnapshot = snapshotClient.create(NifiRegistryUtil.convert(flowSnapshot)); final VersionedFlowSnapshot versionedFlowSnapshot = snapshotClient.create(NifiRegistryUtil.convert(flowSnapshot));
final VersionedFlowCoordinates versionedFlowCoordinates = new VersionedFlowCoordinates();
final String bucketId = versionedFlowSnapshot.getFlow().getBucketIdentifier(); final String bucketId = versionedFlowSnapshot.getFlow().getBucketIdentifier();
final String flowId = versionedFlowSnapshot.getFlow().getIdentifier(); final String flowId = versionedFlowSnapshot.getFlow().getIdentifier();
final int version = (int) versionedFlowSnapshot.getFlow().getVersionCount(); final int version = (int) versionedFlowSnapshot.getFlow().getVersionCount();
final VersionedFlowCoordinates versionedFlowCoordinates = new VersionedFlowCoordinates();
versionedFlowCoordinates.setRegistryId(getIdentifier()); versionedFlowCoordinates.setRegistryId(getIdentifier());
versionedFlowCoordinates.setBucketId(bucketId); versionedFlowCoordinates.setBucketId(bucketId);
versionedFlowCoordinates.setFlowId(flowId); versionedFlowCoordinates.setFlowId(flowId);
versionedFlowCoordinates.setVersion(version); versionedFlowCoordinates.setVersion(String.valueOf(version));
versionedFlowCoordinates.setStorageLocation(getProposedUri(context) + "/nifi-registry-api/buckets/" + bucketId + "/flows/" + flowId + "/versions/" + version); versionedFlowCoordinates.setStorageLocation(getProposedUri(context) + "/nifi-registry-api/buckets/" + bucketId + "/flows/" + flowId + "/versions/" + version);
versionedFlowSnapshot.getFlowContents().setVersionedFlowCoordinates(versionedFlowCoordinates); versionedFlowSnapshot.getFlowContents().setVersionedFlowCoordinates(versionedFlowCoordinates);
return NifiRegistryUtil.convert(versionedFlowSnapshot);
final RegisteredFlowSnapshot registeredFlowSnapshot = NifiRegistryUtil.convert(versionedFlowSnapshot);
registeredFlowSnapshot.setLatest(true);
return registeredFlowSnapshot;
} catch (NiFiRegistryException e) { } catch (NiFiRegistryException e) {
throw new FlowRegistryException(e.getMessage(), e); throw new FlowRegistryException(e.getMessage(), e);
} }
} }
private static int getRegisteredFlowSnapshotVersion(final RegisteredFlowSnapshotMetadata snapshotMetadata, final RegisterAction action) {
if (RegisterAction.FORCE_COMMIT == action) {
return -1;
}
return snapshotMetadata.getVersion() == null ? 1 : Integer.parseInt(snapshotMetadata.getVersion()) + 1;
}
@Override @Override
public Set<RegisteredFlowSnapshotMetadata> getFlowVersions( public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(
final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId
@ -281,9 +299,10 @@ public class NifiRegistryFlowRegistryClient extends AbstractFlowRegistryClient {
} }
@Override @Override
public int getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException { public Optional<String> getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException {
try { try {
return (int) getFlowClient(context).get(bucketId, flowId).getVersionCount(); final int versionCount = (int) getFlowClient(context).get(bucketId, flowId).getVersionCount();
return versionCount == 0 ? Optional.empty() : Optional.of(String.valueOf(versionCount));
} catch (NiFiRegistryException e) { } catch (NiFiRegistryException e) {
throw new FlowRegistryException(e.getMessage(), e); throw new FlowRegistryException(e.getMessage(), e);
} }

View File

@ -77,7 +77,7 @@ final class NifiRegistryUtil {
final RegisteredFlowSnapshotMetadata result = new RegisteredFlowSnapshotMetadata(); final RegisteredFlowSnapshotMetadata result = new RegisteredFlowSnapshotMetadata();
result.setBucketIdentifier(metadata.getBucketIdentifier()); result.setBucketIdentifier(metadata.getBucketIdentifier());
result.setFlowIdentifier(metadata.getFlowIdentifier()); result.setFlowIdentifier(metadata.getFlowIdentifier());
result.setVersion(metadata.getVersion()); result.setVersion(String.valueOf(metadata.getVersion()));
result.setTimestamp(metadata.getTimestamp()); result.setTimestamp(metadata.getTimestamp());
result.setAuthor(metadata.getAuthor()); result.setAuthor(metadata.getAuthor());
result.setComments(metadata.getComments()); result.setComments(metadata.getComments());
@ -88,7 +88,7 @@ final class NifiRegistryUtil {
final VersionedFlowSnapshotMetadata result = new VersionedFlowSnapshotMetadata(); final VersionedFlowSnapshotMetadata result = new VersionedFlowSnapshotMetadata();
result.setBucketIdentifier(metadata.getBucketIdentifier()); result.setBucketIdentifier(metadata.getBucketIdentifier());
result.setFlowIdentifier(metadata.getFlowIdentifier()); result.setFlowIdentifier(metadata.getFlowIdentifier());
result.setVersion(metadata.getVersion()); result.setVersion(Integer.parseInt(metadata.getVersion()));
result.setTimestamp(metadata.getTimestamp()); result.setTimestamp(metadata.getTimestamp());
result.setAuthor(metadata.getAuthor()); result.setAuthor(metadata.getAuthor());
result.setComments(metadata.getComments()); result.setComments(metadata.getComments());

View File

@ -38,7 +38,7 @@ public class VersionControlInformationDTO {
private String flowId; private String flowId;
private String flowName; private String flowName;
private String flowDescription; private String flowDescription;
private Integer version; private String version;
private String storageLocation; private String storageLocation;
private String state; private String state;
private String stateExplanation; private String stateExplanation;
@ -116,11 +116,11 @@ public class VersionControlInformationDTO {
} }
@Schema(description = "The version of the flow") @Schema(description = "The version of the flow")
public Integer getVersion() { public String getVersion() {
return version; return version;
} }
public void setVersion(final Integer version) { public void setVersion(final String version) {
this.version = version; this.version = version;
} }

View File

@ -346,7 +346,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final String registryId = determineRegistryId(remoteCoordinates); final String registryId = determineRegistryId(remoteCoordinates);
final String bucketId = remoteCoordinates.getBucketId(); final String bucketId = remoteCoordinates.getBucketId();
final String flowId = remoteCoordinates.getFlowId(); final String flowId = remoteCoordinates.getFlowId();
final int version = remoteCoordinates.getVersion(); final String version = remoteCoordinates.getVersion();
final String storageLocation = remoteCoordinates.getStorageLocation(); final String storageLocation = remoteCoordinates.getStorageLocation();
final FlowRegistryClientNode flowRegistry = context.getFlowManager().getFlowRegistryClient(registryId); final FlowRegistryClientNode flowRegistry = context.getFlowManager().getFlowRegistryClient(registryId);

View File

@ -3505,7 +3505,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
private boolean isModified() { private boolean isModified() {
if (versionControlInformation.getVersion() == 0) { if (versionControlInformation.getVersion() == null) {
return true; return true;
} }
@ -3741,7 +3741,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
final VersionedProcessGroup snapshot = vci.getFlowSnapshot(); final VersionedProcessGroup snapshot = vci.getFlowSnapshot();
if (snapshot == null && vci.getVersion() > 0) { if (snapshot == null && vci.getVersion() != null) {
// We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry. // We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry.
// This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry. // This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry.
try { try {
@ -3774,15 +3774,15 @@ public final class StandardProcessGroup implements ProcessGroup {
try { try {
final RegisteredFlow versionedFlow = flowRegistry.getFlow(FlowRegistryClientContextFactory.getAnonymousContext(), vci.getBucketIdentifier(), vci.getFlowIdentifier()); final RegisteredFlow versionedFlow = flowRegistry.getFlow(FlowRegistryClientContextFactory.getAnonymousContext(), vci.getBucketIdentifier(), vci.getFlowIdentifier());
final int latestVersion = (int) versionedFlow.getVersionCount(); final String latestVersion = flowRegistry.getLatestVersion(FlowRegistryClientContextFactory.getAnonymousContext(), vci.getBucketIdentifier(), vci.getFlowIdentifier()).orElse(null);
vci.setBucketName(versionedFlow.getBucketName()); vci.setBucketName(versionedFlow.getBucketName());
vci.setFlowName(versionedFlow.getName()); vci.setFlowName(versionedFlow.getName());
vci.setFlowDescription(versionedFlow.getDescription()); vci.setFlowDescription(versionedFlow.getDescription());
vci.setRegistryName(flowRegistry.getName()); vci.setRegistryName(flowRegistry.getName());
if (latestVersion == vci.getVersion()) { if (Objects.equals(latestVersion, vci.getVersion())) {
versionControlFields.setStale(false); versionControlFields.setStale(false);
if (latestVersion == 0) { if (latestVersion == null) {
LOG.debug("{} does not have any version in the Registry", this); LOG.debug("{} does not have any version in the Registry", this);
versionControlFields.setLocallyModified(true); versionControlFields.setLocallyModified(true);
} else { } else {

View File

@ -93,14 +93,15 @@ public final class FlowAnalyzingRegistryClientNode implements FlowRegistryClient
final Map<String, VersionedParameterContext> parameterContexts, final Map<String, VersionedParameterContext> parameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences, final Map<String, ParameterProviderReference> parameterProviderReferences,
final String comments, final String comments,
final int expectedVersion final String expectedVersion,
final RegisterAction registerAction
) throws FlowRegistryException, IOException { ) throws FlowRegistryException, IOException {
if (LOGGER.isTraceEnabled()) { if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Snapshot for flow {} is checked for violations before commit", snapshot.getInstanceIdentifier()); LOGGER.trace("Snapshot for flow {} is checked for violations before commit", snapshot.getInstanceIdentifier());
} }
if (analyzeProcessGroupToRegister(snapshot)) { if (analyzeProcessGroupToRegister(snapshot)) {
return node.registerFlowSnapshot(context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion); return node.registerFlowSnapshot(context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion, registerAction);
} else { } else {
throw new FlowRegistryPreCommitException("There are unresolved rule violations"); throw new FlowRegistryPreCommitException("There are unresolved rule violations");
} }
@ -431,7 +432,7 @@ public final class FlowAnalyzingRegistryClientNode implements FlowRegistryClient
@Override @Override
public FlowSnapshotContainer getFlowContents( public FlowSnapshotContainer getFlowContents(
final FlowRegistryClientUserContext context, final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows final FlowRegistryClientUserContext context, final String bucketId, final String flowId, final String version, final boolean fetchRemoteFlows
) throws FlowRegistryException, IOException { ) throws FlowRegistryException, IOException {
return node.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows); return node.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows);
} }
@ -442,7 +443,7 @@ public final class FlowAnalyzingRegistryClientNode implements FlowRegistryClient
} }
@Override @Override
public int getLatestVersion(final FlowRegistryClientUserContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException { public Optional<String> getLatestVersion(final FlowRegistryClientUserContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException {
return node.getLatestVersion(context, bucketId, flowId); return node.getLatestVersion(context, bucketId, flowId);
} }

View File

@ -223,7 +223,7 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
@Override @Override
public FlowSnapshotContainer getFlowContents( public FlowSnapshotContainer getFlowContents(
final FlowRegistryClientUserContext context, final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows final FlowRegistryClientUserContext context, final String bucketId, final String flowId, final String version, final boolean fetchRemoteFlows
) throws FlowRegistryException, IOException { ) throws FlowRegistryException, IOException {
final RegisteredFlowSnapshot flowSnapshot = execute(() -> client.get().getComponent().getFlowContents(getConfigurationContext(context), bucketId, flowId, version)); final RegisteredFlowSnapshot flowSnapshot = execute(() -> client.get().getComponent().getFlowContents(getConfigurationContext(context), bucketId, flowId, version));
@ -252,11 +252,12 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
final Map<String, VersionedParameterContext> parameterContexts, final Map<String, VersionedParameterContext> parameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences, final Map<String, ParameterProviderReference> parameterProviderReferences,
final String comments, final String comments,
final int expectedVersion) throws FlowRegistryException, IOException { final String expectedVersion,
final RegisterAction registerAction) throws FlowRegistryException, IOException {
final RegisteredFlowSnapshot registeredFlowSnapshot = createRegisteredFlowSnapshot( final RegisteredFlowSnapshot registeredFlowSnapshot = createRegisteredFlowSnapshot(
context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion); context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion);
return execute(() -> client.get().getComponent().registerFlowSnapshot(getConfigurationContext(context), registeredFlowSnapshot)); return execute(() -> client.get().getComponent().registerFlowSnapshot(getConfigurationContext(context), registeredFlowSnapshot, registerAction));
} }
@Override @Override
@ -265,7 +266,7 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
} }
@Override @Override
public int getLatestVersion(final FlowRegistryClientUserContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException { public Optional<String> getLatestVersion(final FlowRegistryClientUserContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException {
return execute(() -> client.get().getComponent().getLatestVersion(getConfigurationContext(context), bucketId, flowId)); return execute(() -> client.get().getComponent().getLatestVersion(getConfigurationContext(context), bucketId, flowId));
} }
@ -370,7 +371,7 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
final String storageLocation = coordinates.getStorageLocation(); final String storageLocation = coordinates.getStorageLocation();
final String bucketId = coordinates.getBucketId(); final String bucketId = coordinates.getBucketId();
final String flowId = coordinates.getFlowId(); final String flowId = coordinates.getFlowId();
final int version = coordinates.getVersion(); final String version = coordinates.getVersion();
final List<FlowRegistryClientNode> clientNodes = getRegistryClientsForInternalFlow(storageLocation); final List<FlowRegistryClientNode> clientNodes = getRegistryClientsForInternalFlow(storageLocation);
for (final FlowRegistryClientNode clientNode : clientNodes) { for (final FlowRegistryClientNode clientNode : clientNodes) {
@ -406,7 +407,7 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
final Map<String, VersionedParameterContext> parameterContexts, final Map<String, VersionedParameterContext> parameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences, final Map<String, ParameterProviderReference> parameterProviderReferences,
final String comments, final String comments,
final int expectedVersion) { final String expectedVersion) {
final RegisteredFlowSnapshotMetadata metadata = new RegisteredFlowSnapshotMetadata(); final RegisteredFlowSnapshotMetadata metadata = new RegisteredFlowSnapshotMetadata();
metadata.setBucketIdentifier(flow.getBucketIdentifier()); metadata.setBucketIdentifier(flow.getBucketIdentifier());
metadata.setFlowIdentifier(flow.getIdentifier()); metadata.setFlowIdentifier(flow.getIdentifier());

View File

@ -32,7 +32,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
private volatile String flowName; private volatile String flowName;
private volatile String flowDescription; private volatile String flowDescription;
private volatile String storageLocation; private volatile String storageLocation;
private final int version; private final String version;
private volatile VersionedProcessGroup flowSnapshot; private volatile VersionedProcessGroup flowSnapshot;
private final VersionedFlowStatus status; private final VersionedFlowStatus status;
@ -45,7 +45,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
private String flowName; private String flowName;
private String flowDescription; private String flowDescription;
private String storageLocation; private String storageLocation;
private int version; private String version;
private VersionedProcessGroup flowSnapshot; private VersionedProcessGroup flowSnapshot;
private VersionedFlowStatus status; private VersionedFlowStatus status;
@ -89,7 +89,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
return this; return this;
} }
public Builder version(int version) { public Builder version(String version) {
this.version = version; this.version = version;
return this; return this;
} }
@ -149,7 +149,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
} }
public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version, public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final String version,
final String storageLocation, final VersionedProcessGroup snapshot, final VersionedFlowStatus status) { final String storageLocation, final VersionedProcessGroup snapshot, final VersionedFlowStatus status) {
this.registryIdentifier = registryId; this.registryIdentifier = registryId;
this.registryName = registryName; this.registryName = registryName;
@ -223,7 +223,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
} }
@Override @Override
public int getVersion() { public String getVersion() {
return version; return version;
} }

View File

@ -47,7 +47,7 @@ import java.util.UUID;
class FlowAnalyzingRegistryClientNodeTest { class FlowAnalyzingRegistryClientNodeTest {
private final static String INSTANCE_IDENTIFIER = UUID.randomUUID().toString(); private final static String INSTANCE_IDENTIFIER = UUID.randomUUID().toString();
private final static String COMMENT_TEXT = "comment"; private final static String COMMENT_TEXT = "comment";
private final static int EXPECTED_VERSION = 3; private final static String EXPECTED_VERSION = "3";
@Mock @Mock
FlowRegistryClientNode node; FlowRegistryClientNode node;
@ -103,11 +103,13 @@ class FlowAnalyzingRegistryClientNodeTest {
Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Collections.emptyList()); Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Collections.emptyList());
final FlowAnalyzingRegistryClientNode testSubject = new FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer, ruleViolationsManager, flowManager, flowMapper); final FlowAnalyzingRegistryClientNode testSubject = new FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer, ruleViolationsManager, flowManager, flowMapper);
testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT);
Mockito Mockito
.verify(node, Mockito.only()) .verify(node, Mockito.only())
.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); .registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT);
} }
@Test @Test
@ -115,11 +117,13 @@ class FlowAnalyzingRegistryClientNodeTest {
Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Collections.singletonList(ruleViolation3)); Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Collections.singletonList(ruleViolation3));
final FlowAnalyzingRegistryClientNode testSubject = new FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer, ruleViolationsManager, flowManager, flowMapper); final FlowAnalyzingRegistryClientNode testSubject = new FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer, ruleViolationsManager, flowManager, flowMapper);
testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT);
Mockito Mockito
.verify(node, Mockito.only()) .verify(node, Mockito.only())
.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); .registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT);
} }
@Test @Test
@ -129,11 +133,13 @@ class FlowAnalyzingRegistryClientNodeTest {
Assertions.assertThrows( Assertions.assertThrows(
FlowRegistryPreCommitException.class, FlowRegistryPreCommitException.class,
() -> testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION) () -> testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT)
); );
Mockito Mockito
.verify(node, Mockito.never()) .verify(node, Mockito.never())
.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); .registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT);
} }
} }

View File

@ -25,6 +25,7 @@ import org.apache.nifi.flow.VersionedProcessGroup;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
public interface FlowRegistryClientNode extends ComponentNode { public interface FlowRegistryClientNode extends ComponentNode {
@ -44,7 +45,7 @@ public interface FlowRegistryClientNode extends ComponentNode {
RegisteredFlow getFlow(FlowRegistryClientUserContext context, String bucketId, String flowId) throws FlowRegistryException, IOException; RegisteredFlow getFlow(FlowRegistryClientUserContext context, String bucketId, String flowId) throws FlowRegistryException, IOException;
Set<RegisteredFlow> getFlows(FlowRegistryClientUserContext context, String bucketId) throws FlowRegistryException, IOException; Set<RegisteredFlow> getFlows(FlowRegistryClientUserContext context, String bucketId) throws FlowRegistryException, IOException;
FlowSnapshotContainer getFlowContents(FlowRegistryClientUserContext context, String bucketId, String flowId, int version, boolean fetchRemoteFlows) throws FlowRegistryException, IOException; FlowSnapshotContainer getFlowContents(FlowRegistryClientUserContext context, String bucketId, String flowId, String version, boolean fetchRemoteFlows) throws FlowRegistryException, IOException;
RegisteredFlowSnapshot registerFlowSnapshot( RegisteredFlowSnapshot registerFlowSnapshot(
FlowRegistryClientUserContext context, FlowRegistryClientUserContext context,
RegisteredFlow flow, RegisteredFlow flow,
@ -52,11 +53,11 @@ public interface FlowRegistryClientNode extends ComponentNode {
Map<String, ExternalControllerServiceReference> externalControllerServices, Map<String, ExternalControllerServiceReference> externalControllerServices,
Map<String, VersionedParameterContext> parameterContexts, Map<String, VersionedParameterContext> parameterContexts,
Map<String, ParameterProviderReference> parameterProviderReferences, String comments, Map<String, ParameterProviderReference> parameterProviderReferences, String comments,
int expectedVersion String expectedVersion, RegisterAction registerAction
) throws FlowRegistryException, IOException; ) throws FlowRegistryException, IOException;
Set<RegisteredFlowSnapshotMetadata> getFlowVersions(FlowRegistryClientUserContext context, String bucketId, String flowId) throws FlowRegistryException, IOException; Set<RegisteredFlowSnapshotMetadata> getFlowVersions(FlowRegistryClientUserContext context, String bucketId, String flowId) throws FlowRegistryException, IOException;
int getLatestVersion(FlowRegistryClientUserContext context, String bucketId, String flowId) throws FlowRegistryException, IOException; Optional<String> getLatestVersion(FlowRegistryClientUserContext context, String bucketId, String flowId) throws FlowRegistryException, IOException;
void setComponent(LoggableComponent<FlowRegistryClient> component); void setComponent(LoggableComponent<FlowRegistryClient> component);
} }

View File

@ -23,6 +23,7 @@ import org.apache.nifi.components.ValidationResult;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
public class GhostFlowRegistryClient implements FlowRegistryClient { public class GhostFlowRegistryClient implements FlowRegistryClient {
@ -112,12 +113,14 @@ public class GhostFlowRegistryClient implements FlowRegistryClient {
} }
@Override @Override
public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, int version) throws FlowRegistryException { public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final String version)
throws FlowRegistryException {
throw new FlowRegistryException(ERROR_MESSAGE); throw new FlowRegistryException(ERROR_MESSAGE);
} }
@Override @Override
public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot) throws FlowRegistryException { public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot, final RegisterAction action)
throws FlowRegistryException {
throw new FlowRegistryException(ERROR_MESSAGE); throw new FlowRegistryException(ERROR_MESSAGE);
} }
@ -127,7 +130,7 @@ public class GhostFlowRegistryClient implements FlowRegistryClient {
} }
@Override @Override
public int getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws FlowRegistryException { public Optional<String> getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws FlowRegistryException {
throw new FlowRegistryException(ERROR_MESSAGE); throw new FlowRegistryException(ERROR_MESSAGE);
} }
} }

View File

@ -70,7 +70,7 @@ public interface VersionControlInformation {
/** /**
* @return the version of the flow in the Flow Registry that this flow is based on. * @return the version of the flow in the Flow Registry that this flow is based on.
*/ */
int getVersion(); String getVersion();
/** /**
* @return the current status of the Process Group as it relates to the associated Versioned Flow. * @return the current status of the Process Group as it relates to the associated Versioned Flow.

View File

@ -595,7 +595,7 @@ public class NiFiRegistryFlowMapperTest {
when(versionControlInformation.getRegistryIdentifier()).thenReturn(UUID.randomUUID().toString()); when(versionControlInformation.getRegistryIdentifier()).thenReturn(UUID.randomUUID().toString());
when(versionControlInformation.getBucketIdentifier()).thenReturn(UUID.randomUUID().toString()); when(versionControlInformation.getBucketIdentifier()).thenReturn(UUID.randomUUID().toString());
when(versionControlInformation.getFlowIdentifier()).thenReturn(UUID.randomUUID().toString()); when(versionControlInformation.getFlowIdentifier()).thenReturn(UUID.randomUUID().toString());
when(versionControlInformation.getVersion()).thenReturn(counter++); when(versionControlInformation.getVersion()).thenReturn(String.valueOf(counter++));
when(versionControlInformation.getStorageLocation()).thenReturn("http://localhost:18080"); when(versionControlInformation.getStorageLocation()).thenReturn("http://localhost:18080");
return versionControlInformation; return versionControlInformation;
} }

View File

@ -36,6 +36,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterGroupConfiguration; import org.apache.nifi.parameter.ParameterGroupConfiguration;
import org.apache.nifi.registry.flow.FlowSnapshotContainer; import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisterAction;
import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.web.api.dto.AccessPolicyDTO; import org.apache.nifi.web.api.dto.AccessPolicyDTO;
@ -1517,7 +1518,7 @@ public interface NiFiServiceFacade {
Map<String, VersionedParameterContext> parameterContexts, Map<String, VersionedParameterContext> parameterContexts,
Map<String, ParameterProviderReference> parameterProviderReferences, Map<String, ParameterProviderReference> parameterProviderReferences,
Map<String, ExternalControllerServiceReference> externalControllerServiceReferences, Map<String, ExternalControllerServiceReference> externalControllerServiceReferences,
String comments, int expectedVersion); String comments, String expectedVersion, RegisterAction registerAction);
/** /**
* Updates the Version Control Information on the Process Group with the given ID * Updates the Version Control Information on the Process Group with the given ID

View File

@ -150,6 +150,7 @@ import org.apache.nifi.registry.flow.FlowRegistryException;
import org.apache.nifi.registry.flow.FlowRegistryPermissions; import org.apache.nifi.registry.flow.FlowRegistryPermissions;
import org.apache.nifi.registry.flow.FlowRegistryUtil; import org.apache.nifi.registry.flow.FlowRegistryUtil;
import org.apache.nifi.registry.flow.FlowSnapshotContainer; import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisterAction;
import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
@ -4975,14 +4976,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow(); final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow();
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
int snapshotVersion;
if (VersionedFlowDTO.FORCE_COMMIT_ACTION.equals(versionedFlowDto.getAction())) {
snapshotVersion = -1;
} else {
final VersionControlInformation currentVci = processGroup.getVersionControlInformation(); final VersionControlInformation currentVci = processGroup.getVersionControlInformation();
snapshotVersion = currentVci == null ? 1 : currentVci.getVersion() + 1;
} final String snapshotVersion = currentVci == null ? null : currentVci.getVersion();
final RegisterAction registerAction = RegisterAction.valueOf(versionedFlowDto.getAction());
// Create a VersionedProcessGroup snapshot of the flow as it is currently. // Create a VersionedProcessGroup snapshot of the flow as it is currently.
final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId); final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId);
@ -5021,7 +5018,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
try { try {
// add a snapshot to the flow in the registry // add a snapshot to the flow in the registry
registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, parameterContexts, parameterProviderReferences, registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, parameterContexts, parameterProviderReferences,
versionedProcessGroup.getExternalControllerServiceReferences(), versionedFlowDto.getComments(), snapshotVersion); versionedProcessGroup.getExternalControllerServiceReferences(), versionedFlowDto.getComments(), snapshotVersion, registerAction);
} catch (final NiFiCoreException e) { } catch (final NiFiCoreException e) {
// If the flow has been created, but failed to add a snapshot, // If the flow has been created, but failed to add a snapshot,
// then we need to capture the created versioned flow information as a partial successful result. // then we need to capture the created versioned flow information as a partial successful result.
@ -5177,7 +5174,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
* @return a VersionedFlowSnapshot from a registry with the given version * @return a VersionedFlowSnapshot from a registry with the given version
*/ */
private FlowSnapshotContainer getVersionedFlowSnapshot(final String registryId, final String bucketId, final String flowId, private FlowSnapshotContainer getVersionedFlowSnapshot(final String registryId, final String bucketId, final String flowId,
final Integer flowVersion, final boolean fetchRemoteFlows) { final String flowVersion, final boolean fetchRemoteFlows) {
final FlowRegistryClientNode flowRegistry = flowRegistryDAO.getFlowRegistryClient(registryId); final FlowRegistryClientNode flowRegistry = flowRegistryDAO.getFlowRegistryClient(registryId);
if (flowRegistry == null) { if (flowRegistry == null) {
throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + registryId); throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + registryId);
@ -5327,7 +5324,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final Map<String, VersionedParameterContext> parameterContexts, final Map<String, VersionedParameterContext> parameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences, final Map<String, ParameterProviderReference> parameterProviderReferences,
final Map<String,ExternalControllerServiceReference> externalControllerServiceReferences, final String comments, final Map<String,ExternalControllerServiceReference> externalControllerServiceReferences, final String comments,
final int expectedVersion) { final String expectedVersion, final RegisterAction registerAction) {
final FlowRegistryClientNode registry = flowRegistryDAO.getFlowRegistryClient(registryId); final FlowRegistryClientNode registry = flowRegistryDAO.getFlowRegistryClient(registryId);
if (registry == null) { if (registry == null) {
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
@ -5336,7 +5333,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
try { try {
return registry.registerFlowSnapshot(FlowRegistryClientContextFactory.getContextForUser( return registry.registerFlowSnapshot(FlowRegistryClientContextFactory.getContextForUser(
NiFiUserUtils.getNiFiUser()), flow, snapshot, externalControllerServiceReferences, parameterContexts, NiFiUserUtils.getNiFiUser()), flow, snapshot, externalControllerServiceReferences, parameterContexts,
parameterProviderReferences, comments, expectedVersion); parameterProviderReferences, comments, expectedVersion, registerAction);
} catch (final IOException | FlowRegistryException e) { } catch (final IOException | FlowRegistryException e) {
throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e); throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
} }

View File

@ -178,7 +178,7 @@ public class VersionsResource extends FlowUpdateResource<VersionControlInformati
final VersionedProcessGroup versionedProcessGroup = versionedFlowSnapshot.getFlowContents(); final VersionedProcessGroup versionedProcessGroup = versionedFlowSnapshot.getFlowContents();
final String flowName = versionedProcessGroup.getName(); final String flowName = versionedProcessGroup.getName();
final int flowVersion = versionedFlowSnapshot.getSnapshotMetadata().getVersion(); final String flowVersion = versionedFlowSnapshot.getSnapshotMetadata().getVersion();
// clear top-level registry data which doesn't belong in versioned flow download // clear top-level registry data which doesn't belong in versioned flow download
versionedFlowSnapshot.setFlow(null); versionedFlowSnapshot.setFlow(null);

View File

@ -33,6 +33,7 @@ import org.apache.nifi.web.dao.FlowRegistryDAO;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -162,7 +163,7 @@ public class StandardFlowRegistryDAO extends ComponentDAO implements FlowRegistr
} }
final Set<RegisteredFlowSnapshotMetadata> flowVersions = flowRegistry.getFlowVersions(context, bucketId, flowId); final Set<RegisteredFlowSnapshotMetadata> flowVersions = flowRegistry.getFlowVersions(context, bucketId, flowId);
final Set<RegisteredFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>((f1, f2) -> Integer.compare(f1.getVersion(), f2.getVersion())); final Set<RegisteredFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>(Comparator.comparingLong(RegisteredFlowSnapshotMetadata::getTimestamp));
sortedFlowVersions.addAll(flowVersions); sortedFlowVersions.addAll(flowVersions);
return sortedFlowVersions; return sortedFlowVersions;
} catch (final IOException | FlowRegistryException ioe) { } catch (final IOException | FlowRegistryException ioe) {

View File

@ -53,7 +53,7 @@ public class TestVersionsResource {
when(serviceFacade.getVersionedFlowSnapshotByGroupId(groupId)).thenReturn(snapshotContainer); when(serviceFacade.getVersionedFlowSnapshotByGroupId(groupId)).thenReturn(snapshotContainer);
final String flowName = "flowname"; final String flowName = "flowname";
final int flowVersion = 1; final String flowVersion = "1";
final VersionedProcessGroup versionedProcessGroup = mock(VersionedProcessGroup.class); final VersionedProcessGroup versionedProcessGroup = mock(VersionedProcessGroup.class);
final RegisteredFlowSnapshotMetadata snapshotMetadata = mock(RegisteredFlowSnapshotMetadata.class); final RegisteredFlowSnapshotMetadata snapshotMetadata = mock(RegisteredFlowSnapshotMetadata.class);
when(versionedFlowSnapshot.getFlowContents()).thenReturn(versionedProcessGroup); when(versionedFlowSnapshot.getFlowContents()).thenReturn(versionedProcessGroup);

View File

@ -247,7 +247,7 @@ export interface VersionControlInformation {
flowId: string; flowId: string;
flowName: string; flowName: string;
flowDescription: string; flowDescription: string;
version: number; version: string;
storageLocation?: string; storageLocation?: string;
state: string; state: string;
stateExplanation: string; stateExplanation: string;
@ -523,21 +523,6 @@ export interface CopiedSnippet {
Tooltips Tooltips
*/ */
export interface VersionControlInformation {
groupId: string;
registryId: string;
registryName: string;
bucketId: string;
bucketName: string;
flowId: string;
flowName: string;
flowDescription: string;
version: number;
state: string;
stateExplanation: string;
storageLocation?: string;
}
export interface VersionControlTipInput { export interface VersionControlTipInput {
versionControlInformation: VersionControlInformation; versionControlInformation: VersionControlInformation;
} }

View File

@ -43,7 +43,7 @@ describe('ChangeVersionDialog', () => {
flowId: 'e884a53c-cbc2-4cb6-9ebd-d9e5d5bb7d05', flowId: 'e884a53c-cbc2-4cb6-9ebd-d9e5d5bb7d05',
flowName: 'sdaf', flowName: 'sdaf',
flowDescription: '', flowDescription: '',
version: 2, version: '2',
state: 'UP_TO_DATE', state: 'UP_TO_DATE',
stateExplanation: 'Flow version is current' stateExplanation: 'Flow version is current'
}, },
@ -52,7 +52,7 @@ describe('ChangeVersionDialog', () => {
versionedFlowSnapshotMetadata: { versionedFlowSnapshotMetadata: {
bucketIdentifier: 'bd6fa6cc-da95-4a12-92cc-9b38b3d48266', bucketIdentifier: 'bd6fa6cc-da95-4a12-92cc-9b38b3d48266',
flowIdentifier: 'e884a53c-cbc2-4cb6-9ebd-d9e5d5bb7d05', flowIdentifier: 'e884a53c-cbc2-4cb6-9ebd-d9e5d5bb7d05',
version: 2, version: '2',
timestamp: 1712171233843, timestamp: 1712171233843,
author: 'anonymous', author: 'anonymous',
comments: '' comments: ''
@ -63,7 +63,7 @@ describe('ChangeVersionDialog', () => {
versionedFlowSnapshotMetadata: { versionedFlowSnapshotMetadata: {
bucketIdentifier: 'bd6fa6cc-da95-4a12-92cc-9b38b3d48266', bucketIdentifier: 'bd6fa6cc-da95-4a12-92cc-9b38b3d48266',
flowIdentifier: 'e884a53c-cbc2-4cb6-9ebd-d9e5d5bb7d05', flowIdentifier: 'e884a53c-cbc2-4cb6-9ebd-d9e5d5bb7d05',
version: 1, version: '1',
timestamp: 1712076498414, timestamp: 1712076498414,
author: 'anonymous', author: 'anonymous',
comments: '' comments: ''

View File

@ -41,7 +41,7 @@ export class ChangeVersionDialog {
new MatTableDataSource<VersionedFlowSnapshotMetadata>(); new MatTableDataSource<VersionedFlowSnapshotMetadata>();
selectedFlowVersion: VersionedFlowSnapshotMetadata | null = null; selectedFlowVersion: VersionedFlowSnapshotMetadata | null = null;
sort: Sort = { sort: Sort = {
active: 'version', active: 'created',
direction: 'desc' direction: 'desc'
}; };
versionControlInformation: VersionControlInformation; versionControlInformation: VersionControlInformation;
@ -76,7 +76,7 @@ export class ChangeVersionDialog {
let retVal = 0; let retVal = 0;
switch (sort.active) { switch (sort.active) {
case 'version': case 'version':
retVal = this.nifiCommon.compareNumber(a.version, b.version); retVal = this.compareVersion(a.version, b.version);
break; break;
case 'created': case 'created':
retVal = this.nifiCommon.compareNumber(a.timestamp, b.timestamp); retVal = this.nifiCommon.compareNumber(a.timestamp, b.timestamp);
@ -89,6 +89,14 @@ export class ChangeVersionDialog {
}); });
} }
private compareVersion(a: string, b: string): number {
if (this.nifiCommon.isNumber(a) && this.nifiCommon.isNumber(b)) {
return this.nifiCommon.compareNumber(parseInt(a, 10), parseInt(b, 10));
} else {
return this.nifiCommon.compareString(a, b);
}
}
select(flowVersion: VersionedFlowSnapshotMetadata): void { select(flowVersion: VersionedFlowSnapshotMetadata): void {
this.selectedFlowVersion = flowVersion; this.selectedFlowVersion = flowVersion;
} }

View File

@ -104,13 +104,13 @@ export class ImportFromRegistry implements OnInit {
selectedFlowDescription: string | undefined; selectedFlowDescription: string | undefined;
sort: Sort = { sort: Sort = {
active: 'version', active: 'created',
direction: 'desc' direction: 'desc'
}; };
displayedColumns: string[] = ['version', 'created', 'comments']; displayedColumns: string[] = ['version', 'created', 'comments'];
dataSource: MatTableDataSource<VersionedFlowSnapshotMetadata> = dataSource: MatTableDataSource<VersionedFlowSnapshotMetadata> =
new MatTableDataSource<VersionedFlowSnapshotMetadata>(); new MatTableDataSource<VersionedFlowSnapshotMetadata>();
selectedFlowVersion: number | null = null; selectedFlowVersion: string | null = null;
constructor( constructor(
@Inject(MAT_DIALOG_DATA) private dialogRequest: ImportFromRegistryDialogRequest, @Inject(MAT_DIALOG_DATA) private dialogRequest: ImportFromRegistryDialogRequest,
@ -158,14 +158,28 @@ export class ImportFromRegistry implements OnInit {
} }
registryChanged(registryId: string): void { registryChanged(registryId: string): void {
this.clearBuckets();
this.loadBuckets(registryId); this.loadBuckets(registryId);
} }
private clearBuckets(): void {
this.bucketOptions = [];
this.importFromRegistryForm.get('bucket')?.setValue(null);
this.clearFlows();
}
bucketChanged(bucketId: string): void { bucketChanged(bucketId: string): void {
this.clearFlows();
const registryId = this.importFromRegistryForm.get('registry')?.value; const registryId = this.importFromRegistryForm.get('registry')?.value;
this.loadFlows(registryId, bucketId); this.loadFlows(registryId, bucketId);
} }
private clearFlows() {
this.importFromRegistryForm.get('flow')?.setValue(null);
this.flowOptions = [];
this.dataSource.data = [];
}
flowChanged(flowId: string): void { flowChanged(flowId: string): void {
const registryId = this.importFromRegistryForm.get('registry')?.value; const registryId = this.importFromRegistryForm.get('registry')?.value;
const bucketId = this.importFromRegistryForm.get('bucket')?.value; const bucketId = this.importFromRegistryForm.get('bucket')?.value;
@ -271,7 +285,7 @@ export class ImportFromRegistry implements OnInit {
let retVal = 0; let retVal = 0;
switch (sort.active) { switch (sort.active) {
case 'version': case 'version':
retVal = this.nifiCommon.compareNumber(a.version, b.version); retVal = this.compareVersion(a.version, b.version);
break; break;
case 'created': case 'created':
retVal = this.nifiCommon.compareNumber(a.timestamp, b.timestamp); retVal = this.nifiCommon.compareNumber(a.timestamp, b.timestamp);
@ -284,6 +298,14 @@ export class ImportFromRegistry implements OnInit {
}); });
} }
private compareVersion(a: string, b: string): number {
if (this.nifiCommon.isNumber(a) && this.nifiCommon.isNumber(b)) {
return this.nifiCommon.compareNumber(parseInt(a, 10), parseInt(b, 10));
} else {
return this.nifiCommon.compareString(a, b);
}
}
select(flowVersion: VersionedFlowSnapshotMetadata): void { select(flowVersion: VersionedFlowSnapshotMetadata): void {
this.selectedFlowVersion = flowVersion.version; this.selectedFlowVersion = flowVersion.version;
} }

View File

@ -72,7 +72,7 @@ describe('LocalChangesDialog', () => {
flowId: '28dc4617-541c-4912-87c8-aad0ae882d33', flowId: '28dc4617-541c-4912-87c8-aad0ae882d33',
flowName: 'asdfasdfa', flowName: 'asdfasdfa',
flowDescription: 'asdf', flowDescription: 'asdf',
version: 2, version: '2',
state: 'LOCALLY_MODIFIED_AND_STALE', state: 'LOCALLY_MODIFIED_AND_STALE',
stateExplanation: 'Local changes have been made and a newer version of this flow is available' stateExplanation: 'Local changes have been made and a newer version of this flow is available'
} }

View File

@ -32,9 +32,4 @@
$material-theme-primary-palette, $material-theme-primary-palette,
default-contrast default-contrast
); );
.save-flow-version-label {
background-color: $material-theme-primary-palette-default;
color: $material-theme-primary-palette-default-contrast;
}
} }

View File

@ -29,14 +29,11 @@
<div>Bucket</div> <div>Bucket</div>
<div class="accent-color font-medium">{{ versionControlInformation.bucketName }}</div> <div class="accent-color font-medium">{{ versionControlInformation.bucketName }}</div>
</div> </div>
<div class="flex"> <div>
<div class="flex-1"> <div>
<div>Flow Name</div> <div>Flow Name</div>
<div class="accent-color font-medium">{{ versionControlInformation.flowName }}</div> <div class="accent-color font-medium">{{ versionControlInformation.flowName }}</div>
</div> </div>
@if (!forceCommit) {
<div class="save-flow-version-label ml-3">{{ versionControlInformation.version + 1 }}</div>
}
</div> </div>
<div> <div>
<div>Flow Description</div> <div>Flow Description</div>
@ -92,12 +89,11 @@
} }
</mat-form-field> </mat-form-field>
<div class="flex w-full"> <div class="w-full">
<mat-form-field class="flex-1"> <mat-form-field class="flex-1">
<mat-label>Flow Name</mat-label> <mat-label>Flow Name</mat-label>
<input matInput formControlName="flowName" type="text" /> <input matInput formControlName="flowName" type="text" />
</mat-form-field> </mat-form-field>
<div class="save-flow-version-label ml-3">1</div>
</div> </div>
<mat-form-field> <mat-form-field>

View File

@ -26,13 +26,4 @@
.mat-mdc-form-field-error { .mat-mdc-form-field-error {
font-size: 12px; font-size: 12px;
} }
.save-flow-version-label {
flex: 0 0 44px;
height: 44px;
line-height: 44px;
text-align: center;
border-radius: 50%;
padding: 0 1px;
}
} }

View File

@ -221,6 +221,19 @@ export class NiFiCommon {
return Array.isArray(arr) ? arr.length === 0 : true; return Array.isArray(arr) ? arr.length === 0 : true;
} }
public isNumber(obj: any) {
if (!obj) {
return false;
}
if (typeof obj === 'number') {
return true;
}
if (obj instanceof Number) {
return true;
}
return typeof obj === 'string' && !isNaN(parseInt(obj, 10));
}
/** /**
* Determines if a string contains another, optionally looking case insensitively. * Determines if a string contains another, optionally looking case insensitively.
* *

View File

@ -568,7 +568,7 @@ export interface VersionedFlowSnapshotMetadataEntity {
export interface VersionedFlowSnapshotMetadata { export interface VersionedFlowSnapshotMetadata {
bucketIdentifier: string; bucketIdentifier: string;
flowIdentifier: string; flowIdentifier: string;
version: number; version: string;
timestamp: number; timestamp: number;
author: string; author: string;
comments: string; comments: string;

View File

@ -38,7 +38,6 @@
<span id="save-flow-version-action" class="hidden"></span> <span id="save-flow-version-action" class="hidden"></span>
<input type="text" id="save-flow-version-name-field" class="setting-input hidden"/> <input type="text" id="save-flow-version-name-field" class="setting-input hidden"/>
<div id="save-flow-version-name" class="hidden"></div> <div id="save-flow-version-name" class="hidden"></div>
<div id="save-flow-version-label"></div>
</div> </div>
</div> </div>
<div class="setting"> <div class="setting">

View File

@ -229,17 +229,6 @@ div.progress-label {
margin-right: 10px; margin-right: 10px;
} }
#save-flow-version-label {
flex: 0 0 30px;
background-color: #728E9B;
color: #fff;
height: 30px;
line-height: 30px;
text-align: center;
border-radius: 50%;
padding: 0 1px;
}
/* /*
Import Flow Version Import Flow Version
*/ */

View File

@ -87,8 +87,6 @@
$('#save-flow-version-registry-combo').combo('destroy').hide(); $('#save-flow-version-registry-combo').combo('destroy').hide();
$('#save-flow-version-bucket-combo').combo('destroy').hide(); $('#save-flow-version-bucket-combo').combo('destroy').hide();
$('#save-flow-version-label').text('');
$('#save-flow-version-registry').text('').hide(); $('#save-flow-version-registry').text('').hide();
$('#save-flow-version-bucket').text('').hide(); $('#save-flow-version-bucket').text('').hide();
@ -464,7 +462,20 @@
return -1; return -1;
} }
return a[sortDetails.columnId] === b[sortDetails.columnId] ? 0 : a[sortDetails.columnId] > b[sortDetails.columnId] ? 1 : -1; if (sortDetails.columnId === "version") {
if (nfCommon.isNumber(a.version) && nfCommon.isNumber(b.version)) {
return a.version === b.version
? 0
: parseInt(a.version, 10) > parseInt(b.version, 10)
? 1
: -1;
}
}
return a[sortDetails.columnId] === b[sortDetails.columnId]
? 0
: a[sortDetails.columnId] > b[sortDetails.columnId]
? 1
: -1;
}; };
// perform the sort // perform the sort
@ -529,7 +540,7 @@
// initialize the sort // initialize the sort
sort({ sort({
columnId: 'version', columnId: 'timestamp',
sortAsc: false sortAsc: false
}, importFlowVersionData); }, importFlowVersionData);
@ -537,7 +548,7 @@
var importFlowVersionGrid = new Slick.Grid(importFlowVersionTable, importFlowVersionData, importFlowVersionColumns, gridOptions); var importFlowVersionGrid = new Slick.Grid(importFlowVersionTable, importFlowVersionData, importFlowVersionColumns, gridOptions);
importFlowVersionGrid.setSelectionModel(new Slick.RowSelectionModel()); importFlowVersionGrid.setSelectionModel(new Slick.RowSelectionModel());
importFlowVersionGrid.registerPlugin(new Slick.AutoTooltips()); importFlowVersionGrid.registerPlugin(new Slick.AutoTooltips());
importFlowVersionGrid.setSortColumn('version', false); importFlowVersionGrid.setSortColumn('timestamp', false);
importFlowVersionGrid.onSort.subscribe(function (e, args) { importFlowVersionGrid.onSort.subscribe(function (e, args) {
sort({ sort({
columnId: args.sortCol.id, columnId: args.sortCol.id,
@ -1822,12 +1833,6 @@
$('#save-flow-version-registry').text(versionControlInformation.registryName).show(); $('#save-flow-version-registry').text(versionControlInformation.registryName).show();
$('#save-flow-version-bucket').text(versionControlInformation.bucketName).show(); $('#save-flow-version-bucket').text(versionControlInformation.bucketName).show();
if (action == 'COMMIT') {
$('#save-flow-version-label').text(versionControlInformation.version + 1).show();
} else {
$('#save-flow-version-label').hide();
}
$('#save-flow-version-name').text(versionControlInformation.flowName).show(); $('#save-flow-version-name').text(versionControlInformation.flowName).show();
nfCommon.populateField('save-flow-version-description', versionControlInformation.flowDescription); nfCommon.populateField('save-flow-version-description', versionControlInformation.flowDescription);
$('#save-flow-version-description').show(); $('#save-flow-version-description').show();
@ -1838,9 +1843,6 @@
// record the type of action (i.e. commit vs force-commit) // record the type of action (i.e. commit vs force-commit)
$('#save-flow-version-action').text(action); $('#save-flow-version-action').text(action);
// reposition the version label
$('#save-flow-version-label').css('margin-top', '-15px');
focusName = false; focusName = false;
deferred.resolve(); deferred.resolve();
} else { } else {
@ -1862,16 +1864,10 @@
}] }]
}).show(); }).show();
// set the initial version
$('#save-flow-version-label').text(1).show();
$('#save-flow-version-name-field').show(); $('#save-flow-version-name-field').show();
$('#save-flow-version-description-field').show(); $('#save-flow-version-description-field').show();
$('#save-flow-version-action').text(action); $('#save-flow-version-action').text(action);
// reposition the version label
$('#save-flow-version-label').css('margin-top', '0');
loadRegistries($('#save-flow-version-dialog'), registryCombo, bucketCombo, null, selectBucketSaveFlowVersion, function (bucketEntity) { loadRegistries($('#save-flow-version-dialog'), registryCombo, bucketCombo, null, selectBucketSaveFlowVersion, function (bucketEntity) {
return bucketEntity.permissions.canWrite === true; return bucketEntity.permissions.canWrite === true;
}).done(function () { }).done(function () {

View File

@ -1673,6 +1673,24 @@
return $.isArray(arr) ? arr.length === 0 : true; return $.isArray(arr) ? arr.length === 0 : true;
}, },
/**
* Determines if the specified input is a number.
* @param obj
* @returns {boolean}
*/
isNumber: function (obj) {
if (!obj) {
return false;
}
if (typeof obj === "number") {
return true;
}
if (obj instanceof Number) {
return true;
}
return typeof obj === "string" && !isNaN(parseInt(obj, 10));
},
/** /**
* Determines if these are the same bulletins. If both arguments are not * Determines if these are the same bulletins. If both arguments are not
* arrays, false is returned. * arrays, false is returned.

View File

@ -33,7 +33,7 @@ public class VersionedFlowConverter {
externalFlowMetadata.setComments(snapshotMetadata.getComments()); externalFlowMetadata.setComments(snapshotMetadata.getComments());
externalFlowMetadata.setFlowIdentifier(snapshotMetadata.getFlowIdentifier()); externalFlowMetadata.setFlowIdentifier(snapshotMetadata.getFlowIdentifier());
externalFlowMetadata.setTimestamp(snapshotMetadata.getTimestamp()); externalFlowMetadata.setTimestamp(snapshotMetadata.getTimestamp());
externalFlowMetadata.setVersion(snapshotMetadata.getVersion()); externalFlowMetadata.setVersion(String.valueOf(snapshotMetadata.getVersion()));
} }
final VersionedFlow versionedFlow = flowSnapshot.getFlow(); final VersionedFlow versionedFlow = flowSnapshot.getFlow();

View File

@ -78,7 +78,7 @@ public class ConciseEvolvingDifferenceDescriptor implements DifferenceDescriptor
// If the two vary only by version, then use a more concise message. If anything else is different, then use a fully explanation. // If the two vary only by version, then use a more concise message. If anything else is different, then use a fully explanation.
if (Objects.equals(coordinatesA.getStorageLocation(), coordinatesB.getStorageLocation()) && Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId()) if (Objects.equals(coordinatesA.getStorageLocation(), coordinatesB.getStorageLocation()) && Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId())
&& Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && coordinatesA.getVersion() != coordinatesB.getVersion()) { && Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && !Objects.equals(coordinatesA.getVersion(), coordinatesB.getVersion())) {
description = String.format("Flow Version changed from %s to %s", coordinatesA.getVersion(), coordinatesB.getVersion()); description = String.format("Flow Version changed from %s to %s", coordinatesA.getVersion(), coordinatesB.getVersion());
break; break;

View File

@ -72,7 +72,7 @@ public class StaticDifferenceDescriptor implements DifferenceDescriptor {
// If the two vary only by version, then use a more concise message. If anything else is different, then use a fully explanation. // If the two vary only by version, then use a more concise message. If anything else is different, then use a fully explanation.
if (Objects.equals(coordinatesA.getStorageLocation(), coordinatesB.getStorageLocation()) && Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId()) if (Objects.equals(coordinatesA.getStorageLocation(), coordinatesB.getStorageLocation()) && Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId())
&& Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && coordinatesA.getVersion() != coordinatesB.getVersion()) { && Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && !Objects.equals(coordinatesA.getVersion(), coordinatesB.getVersion())) {
description = String.format("Flow Version is %s in %s but %s in %s", coordinatesA.getVersion(), flowAName, coordinatesB.getVersion(), flowBName); description = String.format("Flow Version is %s in %s but %s in %s", coordinatesA.getVersion(), flowAName, coordinatesB.getVersion(), flowBName);
break; break;

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -75,7 +76,7 @@ public class InMemoryFlowRegistry extends AbstractFlowRegistryClient implements
} }
@Override @Override
public RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot) { public RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot, RegisterAction registerAction) {
throw new UnsupportedOperationException(USER_SPECIFIC_ACTIONS_NOT_SUPPORTED); throw new UnsupportedOperationException(USER_SPECIFIC_ACTIONS_NOT_SUPPORTED);
} }
@ -95,7 +96,8 @@ public class InMemoryFlowRegistry extends AbstractFlowRegistryClient implements
} }
@Override @Override
public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final int version) throws FlowRegistryException { public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final String version)
throws FlowRegistryException {
if (context.getNiFiUserIdentity().isPresent()) { if (context.getNiFiUserIdentity().isPresent()) {
throw new UnsupportedOperationException(USER_SPECIFIC_ACTIONS_NOT_SUPPORTED); throw new UnsupportedOperationException(USER_SPECIFIC_ACTIONS_NOT_SUPPORTED);
} }
@ -104,7 +106,7 @@ public class InMemoryFlowRegistry extends AbstractFlowRegistryClient implements
final List<VersionedExternalFlow> snapshots = flowSnapshots.get(flowCoordinates); final List<VersionedExternalFlow> snapshots = flowSnapshots.get(flowCoordinates);
final VersionedExternalFlow registeredFlowSnapshot = snapshots.stream() final VersionedExternalFlow registeredFlowSnapshot = snapshots.stream()
.filter(snapshot -> snapshot.getMetadata().getVersion() == version) .filter(snapshot -> Objects.equals(snapshot.getMetadata().getVersion(), version))
.findAny() .findAny()
.orElseThrow(() -> new FlowRegistryException("Could not find flow: bucketId=" + bucketId + ", flowId=" + flowId + ", version=" + version)); .orElseThrow(() -> new FlowRegistryException("Could not find flow: bucketId=" + bucketId + ", flowId=" + flowId + ", version=" + version));
@ -138,11 +140,11 @@ public class InMemoryFlowRegistry extends AbstractFlowRegistryClient implements
final VersionedExternalFlowMetadata metadata = versionedExternalFlow.getMetadata(); final VersionedExternalFlowMetadata metadata = versionedExternalFlow.getMetadata();
final String bucketId; final String bucketId;
final String flowId; final String flowId;
final int version; final String version;
if (metadata == null) { if (metadata == null) {
bucketId = DEFAULT_BUCKET_ID; bucketId = DEFAULT_BUCKET_ID;
flowId = "flow-" + flowIdGenerator.getAndIncrement(); flowId = "flow-" + flowIdGenerator.getAndIncrement();
version = 1; version = "1";
} else { } else {
bucketId = metadata.getBucketIdentifier(); bucketId = metadata.getBucketIdentifier();
flowId = metadata.getFlowIdentifier(); flowId = metadata.getFlowIdentifier();
@ -169,7 +171,7 @@ public class InMemoryFlowRegistry extends AbstractFlowRegistryClient implements
} }
@Override @Override
public int getLatestVersion(FlowRegistryClientConfigurationContext context, String bucketId, String flowId) { public Optional<String> getLatestVersion(FlowRegistryClientConfigurationContext context, String bucketId, String flowId) {
throw new UnsupportedOperationException(USER_SPECIFIC_ACTIONS_NOT_SUPPORTED); throw new UnsupportedOperationException(USER_SPECIFIC_ACTIONS_NOT_SUPPORTED);
} }

View File

@ -148,7 +148,7 @@ public class RegistryUtil {
final String subRegistryUrl = getBaseRegistryUrl(coordinates.getStorageLocation()); final String subRegistryUrl = getBaseRegistryUrl(coordinates.getStorageLocation());
final String bucketId = coordinates.getBucketId(); final String bucketId = coordinates.getBucketId();
final String flowId = coordinates.getFlowId(); final String flowId = coordinates.getFlowId();
final int version = coordinates.getVersion(); final int version = Integer.parseInt(coordinates.getVersion());
final RegistryUtil subFlowUtil = getSubRegistryUtil(subRegistryUrl); final RegistryUtil subFlowUtil = getSubRegistryUtil(subRegistryUrl);
final VersionedFlowSnapshot snapshot = subFlowUtil.getFlowByID(bucketId, flowId, version); final VersionedFlowSnapshot snapshot = subFlowUtil.getFlowByID(bucketId, flowId, version);

View File

@ -79,7 +79,7 @@ public class TestRegistryUtil {
coordinates.setStorageLocation(storageLocation); coordinates.setStorageLocation(storageLocation);
coordinates.setBucketId(ROOT_BUCKET_ID); coordinates.setBucketId(ROOT_BUCKET_ID);
coordinates.setFlowId(ROOT_FLOW_ID); coordinates.setFlowId(ROOT_FLOW_ID);
coordinates.setVersion(ROOT_VERSION); coordinates.setVersion(String.valueOf(ROOT_VERSION));
final VersionedProcessGroup group = new VersionedProcessGroup(); final VersionedProcessGroup group = new VersionedProcessGroup();
group.setVersionedFlowCoordinates(coordinates); group.setVersionedFlowCoordinates(coordinates);
@ -96,7 +96,7 @@ public class TestRegistryUtil {
coordinates.setStorageLocation(storageLocation); coordinates.setStorageLocation(storageLocation);
coordinates.setBucketId(CHILD_BUCKET_ID); coordinates.setBucketId(CHILD_BUCKET_ID);
coordinates.setFlowId(CHILD_FLOW_ID); coordinates.setFlowId(CHILD_FLOW_ID);
coordinates.setVersion(CHILD_VERSION); coordinates.setVersion(String.valueOf(CHILD_VERSION));
final VersionedProcessGroup group = new VersionedProcessGroup(); final VersionedProcessGroup group = new VersionedProcessGroup();
group.setVersionedFlowCoordinates(coordinates); group.setVersionedFlowCoordinates(coordinates);

View File

@ -30,6 +30,7 @@ import org.apache.nifi.registry.flow.AbstractFlowRegistryClient;
import org.apache.nifi.registry.flow.FlowRegistryBucket; import org.apache.nifi.registry.flow.FlowRegistryBucket;
import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext; import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext;
import org.apache.nifi.registry.flow.FlowRegistryPermissions; import org.apache.nifi.registry.flow.FlowRegistryPermissions;
import org.apache.nifi.registry.flow.RegisterAction;
import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
@ -209,7 +210,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
} }
@Override @Override
public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final int version) throws IOException { public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final String version) throws IOException {
final File flowDir = getFlowDirectory(context, bucketId, flowId); final File flowDir = getFlowDirectory(context, bucketId, flowId);
final Pattern intPattern = Pattern.compile("\\d+"); final Pattern intPattern = Pattern.compile("\\d+");
final File[] versionFiles = flowDir.listFiles(file -> intPattern.matcher(file.getName()).matches()); final File[] versionFiles = flowDir.listFiles(file -> intPattern.matcher(file.getName()).matches());
@ -219,7 +220,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
try (final JsonParser parser = factory.createParser(snapshotFile)) { try (final JsonParser parser = factory.createParser(snapshotFile)) {
final RegisteredFlowSnapshot snapshot = parser.readValueAs(RegisteredFlowSnapshot.class); final RegisteredFlowSnapshot snapshot = parser.readValueAs(RegisteredFlowSnapshot.class);
populateBucket(snapshot, bucketId); populateBucket(snapshot, bucketId);
populateFlow(snapshot, bucketId, flowId, version, versionFiles == null ? 0 : versionFiles.length); populateFlow(snapshot, bucketId, flowId, versionFiles == null ? 0 : versionFiles.length);
return snapshot; return snapshot;
} }
@ -241,7 +242,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
snapshot.getSnapshotMetadata().setBucketIdentifier(bucketId); snapshot.getSnapshotMetadata().setBucketIdentifier(bucketId);
} }
private void populateFlow(final RegisteredFlowSnapshot snapshot, final String bucketId, final String flowId, final int version, final int numVersions) { private void populateFlow(final RegisteredFlowSnapshot snapshot, final String bucketId, final String flowId, final int numVersions) {
final RegisteredFlow existingFlow = snapshot.getFlow(); final RegisteredFlow existingFlow = snapshot.getFlow();
if (existingFlow != null) { if (existingFlow != null) {
return; return;
@ -258,7 +259,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
flow.setVersionCount(numVersions); flow.setVersionCount(numVersions);
final RegisteredFlowVersionInfo versionInfo = new RegisteredFlowVersionInfo(); final RegisteredFlowVersionInfo versionInfo = new RegisteredFlowVersionInfo();
versionInfo.setVersion(version); versionInfo.setVersion(numVersions);
flow.setVersionInfo(versionInfo); flow.setVersionInfo(versionInfo);
snapshot.setFlow(flow); snapshot.setFlow(flow);
@ -266,15 +267,17 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
} }
@Override @Override
public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot) throws IOException { public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot, final RegisterAction registerAction)
throws IOException {
final RegisteredFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); final RegisteredFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
final String bucketId = metadata.getBucketIdentifier(); final String bucketId = metadata.getBucketIdentifier();
final String flowId = metadata.getFlowIdentifier(); final String flowId = metadata.getFlowIdentifier();
final File flowDir = getFlowDirectory(context, bucketId, flowId); final File flowDir = getFlowDirectory(context, bucketId, flowId);
final long version = metadata.getVersion(); final String version = metadata.getVersion() == null ? "1" : String.valueOf(Integer.parseInt(metadata.getVersion()) + 1);
final File versionDir = getChildLocation(flowDir, Paths.get(String.valueOf(version))); flowSnapshot.getSnapshotMetadata().setVersion(version);
// Create the directory for the version, if it doesn't exist. // Create the directory for the version, if it doesn't exist.
final File versionDir = getChildLocation(flowDir, Paths.get(version));
if (!versionDir.exists()) { if (!versionDir.exists()) {
Files.createDirectories(versionDir.toPath()); Files.createDirectories(versionDir.toPath());
} }
@ -364,7 +367,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
final String versionName = versionDir.getName(); final String versionName = versionDir.getName();
final RegisteredFlowSnapshotMetadata metadata = new RegisteredFlowSnapshotMetadata(); final RegisteredFlowSnapshotMetadata metadata = new RegisteredFlowSnapshotMetadata();
metadata.setVersion(Integer.parseInt(versionName)); metadata.setVersion(versionName);
metadata.setTimestamp(versionDir.lastModified()); metadata.setTimestamp(versionDir.lastModified());
metadata.setFlowIdentifier(flowId); metadata.setFlowIdentifier(flowId);
metadata.setBucketIdentifier(bucketId); metadata.setBucketIdentifier(bucketId);
@ -376,7 +379,12 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
} }
@Override @Override
public int getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException { public Optional<String> getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException {
final int latestVersion = getLatestFlowVersionInt(context, bucketId, flowId);
return latestVersion == -1 ? Optional.empty() : Optional.of(String.valueOf(latestVersion));
}
private int getLatestFlowVersionInt(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException {
final File flowDir = getFlowDirectory(context, bucketId, flowId); final File flowDir = getFlowDirectory(context, bucketId, flowId);
final File[] versionDirs = flowDir.listFiles(); final File[] versionDirs = flowDir.listFiles();
if (versionDirs == null) { if (versionDirs == null) {
@ -390,7 +398,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
return greatestValue.orElse(-1); return greatestValue.orElse(-1);
} }
private File getSnapshotFile(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final long version) { private File getSnapshotFile(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final String version) {
final File flowDirectory = getFlowDirectory(context, bucketId, flowId); final File flowDirectory = getFlowDirectory(context, bucketId, flowId);
final File versionDirectory = getChildLocation(flowDirectory, Paths.get(String.valueOf(version))); final File versionDirectory = getChildLocation(flowDirectory, Paths.get(String.valueOf(version)));
return new File(versionDirectory, "snapshot.json"); return new File(versionDirectory, "snapshot.json");

View File

@ -1794,7 +1794,7 @@ public class NiFiClientUtil {
return importFlowFromRegistry(parentGroupId, vciDto.getRegistryId(), vciDto.getBucketId(), vciDto.getFlowId(), vciDto.getVersion()); return importFlowFromRegistry(parentGroupId, vciDto.getRegistryId(), vciDto.getBucketId(), vciDto.getFlowId(), vciDto.getVersion());
} }
public ProcessGroupEntity importFlowFromRegistry(final String parentGroupId, final String registryClientId, final String bucketId, final String flowId, final int version) public ProcessGroupEntity importFlowFromRegistry(final String parentGroupId, final String registryClientId, final String bucketId, final String flowId, final String version)
throws NiFiClientException, IOException { throws NiFiClientException, IOException {
final VersionControlInformationDTO vci = new VersionControlInformationDTO(); final VersionControlInformationDTO vci = new VersionControlInformationDTO();
@ -1813,11 +1813,11 @@ public class NiFiClientUtil {
return nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, groupEntity); return nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, groupEntity);
} }
public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final int version) throws NiFiClientException, IOException, InterruptedException { public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final String version) throws NiFiClientException, IOException, InterruptedException {
return changeFlowVersion(processGroupId, version, true); return changeFlowVersion(processGroupId, version, true);
} }
public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final int version, final boolean throwOnFailure) public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final String version, final boolean throwOnFailure)
throws NiFiClientException, IOException, InterruptedException { throws NiFiClientException, IOException, InterruptedException {
logger.info("Submitting Change Flow Version request to change Group with ID {} to Version {}", processGroupId, version); logger.info("Submitting Change Flow Version request to change Group with ID {} to Version {}", processGroupId, version);

View File

@ -48,7 +48,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -103,7 +102,7 @@ public class RegistryClientIT extends NiFiSystemIT {
util.assertFlowUpToDate(child.getId()); util.assertFlowUpToDate(child.getId());
// Verify that we are able to switch back to v1 while everything is stopped // Verify that we are able to switch back to v1 while everything is stopped
util.changeFlowVersion(parent.getId(), 1); util.changeFlowVersion(parent.getId(), "1");
util.assertFlowStaleAndUnmodified(parent.getId()); util.assertFlowStaleAndUnmodified(parent.getId());
util.assertFlowStaleAndUnmodified(child.getId()); util.assertFlowStaleAndUnmodified(child.getId());
@ -119,7 +118,7 @@ public class RegistryClientIT extends NiFiSystemIT {
assertEquals("Updated", contents); assertEquals("Updated", contents);
// Switch Version back to v2 while it's running // Switch Version back to v2 while it's running
util.changeFlowVersion(parent.getId(), 2); util.changeFlowVersion(parent.getId(), "2");
util.assertFlowUpToDate(parent.getId()); util.assertFlowUpToDate(parent.getId());
util.assertFlowUpToDate(child.getId()); util.assertFlowUpToDate(child.getId());
@ -133,7 +132,7 @@ public class RegistryClientIT extends NiFiSystemIT {
assertEquals("Updated v2", secondFlowFileContents); assertEquals("Updated v2", secondFlowFileContents);
// Switch back to v1 while flow is running to verify that the version can change back to a lower version as well // Switch back to v1 while flow is running to verify that the version can change back to a lower version as well
util.changeFlowVersion(parent.getId(), 1); util.changeFlowVersion(parent.getId(), "1");
util.assertFlowStaleAndUnmodified(parent.getId()); util.assertFlowStaleAndUnmodified(parent.getId());
util.assertFlowStaleAndUnmodified(child.getId()); util.assertFlowStaleAndUnmodified(child.getId());
@ -192,8 +191,8 @@ public class RegistryClientIT extends NiFiSystemIT {
// Save the flow as v2 // Save the flow as v2
util.saveFlowVersion(parent, clientEntity, v1Vci); util.saveFlowVersion(parent, clientEntity, v1Vci);
util.changeFlowVersion(parent.getId(), 1); util.changeFlowVersion(parent.getId(), "1");
util.changeFlowVersion(parent.getId(), 2); util.changeFlowVersion(parent.getId(), "2");
} }
@ -225,7 +224,7 @@ public class RegistryClientIT extends NiFiSystemIT {
util.saveFlowVersion(group, clientEntity, vci); util.saveFlowVersion(group, clientEntity, vci);
// Change back to v1 and start the flow // Change back to v1 and start the flow
util.changeFlowVersion(group.getId(), 1); util.changeFlowVersion(group.getId(), "1");
util.assertFlowStaleAndUnmodified(group.getId()); util.assertFlowStaleAndUnmodified(group.getId());
util.enableControllerService(service); util.enableControllerService(service);
@ -240,14 +239,14 @@ public class RegistryClientIT extends NiFiSystemIT {
assertEquals("1", firstFlowFileAttributes.get("count")); assertEquals("1", firstFlowFileAttributes.get("count"));
// Change to v2 and ensure that the output is correct // Change to v2 and ensure that the output is correct
util.changeFlowVersion(group.getId(), 2); util.changeFlowVersion(group.getId(), "2");
util.assertFlowUpToDate(group.getId()); util.assertFlowUpToDate(group.getId());
waitForQueueCount(connectionToTerminate.getId(), 2 * getNumberOfNodes()); waitForQueueCount(connectionToTerminate.getId(), 2 * getNumberOfNodes());
final Map<String, String> secondFlowFileAttributes = util.getQueueFlowFile(connectionToTerminate.getId(), getNumberOfNodes()).getFlowFile().getAttributes(); final Map<String, String> secondFlowFileAttributes = util.getQueueFlowFile(connectionToTerminate.getId(), getNumberOfNodes()).getFlowFile().getAttributes();
assertEquals("2001", secondFlowFileAttributes.get("count")); assertEquals("2001", secondFlowFileAttributes.get("count"));
// Change back to v1 and ensure that the output is correct. It should reset count back to 0. // Change back to v1 and ensure that the output is correct. It should reset count back to 0.
util.changeFlowVersion(group.getId(), 1); util.changeFlowVersion(group.getId(), "1");
util.assertFlowStaleAndUnmodified(group.getId()); util.assertFlowStaleAndUnmodified(group.getId());
waitForQueueCount(connectionToTerminate.getId(), 3 * getNumberOfNodes()); waitForQueueCount(connectionToTerminate.getId(), 3 * getNumberOfNodes());
final Map<String, String> thirdFlowFileAttributes = util.getQueueFlowFile(connectionToTerminate.getId(), getNumberOfNodes() * 2).getFlowFile().getAttributes(); final Map<String, String> thirdFlowFileAttributes = util.getQueueFlowFile(connectionToTerminate.getId(), getNumberOfNodes() * 2).getFlowFile().getAttributes();
@ -259,7 +258,7 @@ public class RegistryClientIT extends NiFiSystemIT {
public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientException, IOException, InterruptedException { public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows")); final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, "port-moved-groups", 1); final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, "port-moved-groups", "1");
assertNotNull(imported); assertNotNull(imported);
getClientUtil().assertFlowStaleAndUnmodified(imported.getId()); getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
@ -267,13 +266,13 @@ public class RegistryClientIT extends NiFiSystemIT {
final FlowSnippetDTO groupContents = imported.getComponent().getContents(); final FlowSnippetDTO groupContents = imported.getComponent().getContents();
final List<ProcessorDTO> replaceTextProcessors = groupContents.getProcessors().stream() final List<ProcessorDTO> replaceTextProcessors = groupContents.getProcessors().stream()
.filter(proc -> proc.getName().equals("ReplaceText")) .filter(proc -> proc.getName().equals("ReplaceText"))
.collect(Collectors.toList()); .toList();
assertEquals(1, replaceTextProcessors.size()); assertEquals(1, replaceTextProcessors.size());
assertTrue(groupContents.getInputPorts().isEmpty()); assertTrue(groupContents.getInputPorts().isEmpty());
// Change to version 2 // Change to version 2
final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2); final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), "2");
assertNull(version2Result.getRequest().getFailureReason()); assertNull(version2Result.getRequest().getFailureReason());
final FlowDTO v2Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow(); final FlowDTO v2Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow();
@ -291,7 +290,7 @@ public class RegistryClientIT extends NiFiSystemIT {
assertEquals(1, v2Contents.getInputPorts().size()); assertEquals(1, v2Contents.getInputPorts().size());
// Change back to Version 1 // Change back to Version 1
final VersionedFlowUpdateRequestEntity changeBackToV1Result = getClientUtil().changeFlowVersion(imported.getId(), 1); final VersionedFlowUpdateRequestEntity changeBackToV1Result = getClientUtil().changeFlowVersion(imported.getId(), "1");
assertNull(changeBackToV1Result.getRequest().getFailureReason()); assertNull(changeBackToV1Result.getRequest().getFailureReason());
final FlowDTO v1Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow(); final FlowDTO v1Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow();
@ -306,11 +305,11 @@ public class RegistryClientIT extends NiFiSystemIT {
public void testRollbackOnFailure() throws NiFiClientException, IOException, InterruptedException { public void testRollbackOnFailure() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows")); final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, "flow-with-invalid-connection", 1); final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, "flow-with-invalid-connection", "1");
assertNotNull(imported); assertNotNull(imported);
getClientUtil().assertFlowStaleAndUnmodified(imported.getId()); getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2, false); final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), "2", false);
final String failureReason = version2Result.getRequest().getFailureReason(); final String failureReason = version2Result.getRequest().getFailureReason();
assertNotNull(failureReason); assertNotNull(failureReason);
@ -386,7 +385,7 @@ public class RegistryClientIT extends NiFiSystemIT {
final ProcessGroupEntity inner1 = getClientUtil().createProcessGroup("Inner 1", outerGroup.getId()); final ProcessGroupEntity inner1 = getClientUtil().createProcessGroup("Inner 1", outerGroup.getId());
ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", inner1.getId()); ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", inner1.getId());
VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
assertEquals(2, vciEntity.getVersionControlInformation().getVersion()); assertEquals("2", vciEntity.getVersionControlInformation().getVersion());
// Get an up-to-date copy of terminate1 because it should now have a non-null versioned component id // Get an up-to-date copy of terminate1 because it should now have a non-null versioned component id
terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId()); terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId());
@ -408,7 +407,7 @@ public class RegistryClientIT extends NiFiSystemIT {
// First Control again with the newly created components // First Control again with the newly created components
vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
assertEquals(3, vciEntity.getVersionControlInformation().getVersion()); assertEquals("3", vciEntity.getVersionControlInformation().getVersion());
// Get new version of terminate2 processor and terminate1 processor. Ensure that both have version control ID's but that they are different. // Get new version of terminate2 processor and terminate1 processor. Ensure that both have version control ID's but that they are different.
terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId()); terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId());
@ -430,7 +429,7 @@ public class RegistryClientIT extends NiFiSystemIT {
final ProcessGroupEntity innerGroup = getClientUtil().createProcessGroup("Inner 1", topLevel1.getId()); final ProcessGroupEntity innerGroup = getClientUtil().createProcessGroup("Inner 1", topLevel1.getId());
ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", innerGroup.getId()); ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", innerGroup.getId());
VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(innerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(innerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
assertEquals(1, vciEntity.getVersionControlInformation().getVersion()); assertEquals("1", vciEntity.getVersionControlInformation().getVersion());
// Now that the inner group is under version control, copy it and paste it to a new PG. // Now that the inner group is under version control, copy it and paste it to a new PG.
// This should result in the pasted Process Group having a processor with the same Versioned Component ID, because the Processors // This should result in the pasted Process Group having a processor with the same Versioned Component ID, because the Processors

View File

@ -761,14 +761,14 @@ public class StatelessBasicsIT extends NiFiSystemIT {
getClientUtil().stopProcessor(generate); getClientUtil().stopProcessor(generate);
// Switch back to v1 while flow is running // Switch back to v1 while flow is running
getClientUtil().changeFlowVersion(statelessGroup.getId(), 1); getClientUtil().changeFlowVersion(statelessGroup.getId(), "1");
getClientUtil().startProcessor(generate); getClientUtil().startProcessor(generate);
waitForQueueCount(outputToTerminate, 2); waitForQueueCount(outputToTerminate, 2);
assertEquals(HELLO_WORLD, getClientUtil().getFlowFileContentAsUtf8(outputToTerminate.getId(), 1)); assertEquals(HELLO_WORLD, getClientUtil().getFlowFileContentAsUtf8(outputToTerminate.getId(), 1));
getClientUtil().stopProcessor(generate); getClientUtil().stopProcessor(generate);
// Switch back to v2 while flow is running // Switch back to v2 while flow is running
getClientUtil().changeFlowVersion(statelessGroup.getId(), 2); getClientUtil().changeFlowVersion(statelessGroup.getId(), "2");
getClientUtil().startProcessor(generate); getClientUtil().startProcessor(generate);
waitForQueueCount(outputToTerminate, 3); waitForQueueCount(outputToTerminate, 3);
assertEquals(HELLO_WORLD_REVERSED, getClientUtil().getFlowFileContentAsUtf8(outputToTerminate.getId(), 2)); assertEquals(HELLO_WORLD_REVERSED, getClientUtil().getFlowFileContentAsUtf8(outputToTerminate.getId(), 2));

View File

@ -17,6 +17,7 @@
package org.apache.nifi.toolkit.cli.impl.command.nifi.pg; package org.apache.nifi.toolkit.cli.impl.command.nifi.pg;
import org.apache.commons.cli.MissingOptionException; import org.apache.commons.cli.MissingOptionException;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
import org.apache.nifi.toolkit.cli.api.CommandException; import org.apache.nifi.toolkit.cli.api.CommandException;
import org.apache.nifi.toolkit.cli.api.Context; import org.apache.nifi.toolkit.cli.api.Context;
import org.apache.nifi.toolkit.cli.impl.client.nifi.FlowClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.FlowClient;
@ -33,6 +34,7 @@ import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataSetEntity;
import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity; import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
import java.io.IOException; import java.io.IOException;
import java.util.Objects;
import java.util.Properties; import java.util.Properties;
/** /**
@ -71,13 +73,13 @@ public class PGChangeVersion extends AbstractNiFiCommand<VoidResult> {
} }
// start with the version specified in the arguments // start with the version specified in the arguments
Integer newVersion = getIntArg(properties, CommandOption.FLOW_VERSION); String newVersion = getArg(properties, CommandOption.FLOW_VERSION);
// if no version was specified, automatically determine the latest and change to that // if no version was specified, automatically determine the latest and change to that
if (newVersion == null) { if (newVersion == null) {
newVersion = getLatestVersion(client, existingVersionControlDTO); newVersion = getLatestVersion(client, existingVersionControlDTO);
if (newVersion.intValue() == existingVersionControlDTO.getVersion().intValue()) { if (Objects.equals(newVersion, existingVersionControlDTO.getVersion())) {
throw new NiFiClientException("Process group already at latest version"); throw new NiFiClientException("Process group already at latest version");
} }
} }
@ -121,7 +123,7 @@ public class PGChangeVersion extends AbstractNiFiCommand<VoidResult> {
return VoidResult.getInstance(); return VoidResult.getInstance();
} }
private int getLatestVersion(final NiFiClient client, final VersionControlInformationDTO existingVersionControlDTO) private String getLatestVersion(final NiFiClient client, final VersionControlInformationDTO existingVersionControlDTO)
throws NiFiClientException, IOException { throws NiFiClientException, IOException {
final FlowClient flowClient = client.getFlowClient(); final FlowClient flowClient = client.getFlowClient();
@ -133,11 +135,17 @@ public class PGChangeVersion extends AbstractNiFiCommand<VoidResult> {
if (versions.getVersionedFlowSnapshotMetadataSet() == null || versions.getVersionedFlowSnapshotMetadataSet().isEmpty()) { if (versions.getVersionedFlowSnapshotMetadataSet() == null || versions.getVersionedFlowSnapshotMetadataSet().isEmpty()) {
throw new NiFiClientException("No versions available"); throw new NiFiClientException("No versions available");
} }
return getLatestVersion(versions);
}
int latestVersion = 1; private static String getLatestVersion(final VersionedFlowSnapshotMetadataSetEntity versions) {
long latestTimestamp = 0;
String latestVersion = null;
for (VersionedFlowSnapshotMetadataEntity version : versions.getVersionedFlowSnapshotMetadataSet()) { for (VersionedFlowSnapshotMetadataEntity version : versions.getVersionedFlowSnapshotMetadataSet()) {
if (version.getVersionedFlowSnapshotMetadata().getVersion() > latestVersion) { final RegisteredFlowSnapshotMetadata versionMetadata = version.getVersionedFlowSnapshotMetadata();
latestVersion = version.getVersionedFlowSnapshotMetadata().getVersion(); if (versionMetadata.getTimestamp() > latestTimestamp) {
latestTimestamp = versionMetadata.getTimestamp();
latestVersion = versionMetadata.getVersion();
} }
} }
return latestVersion; return latestVersion;

View File

@ -74,7 +74,7 @@ public class PGImport extends AbstractNiFiCommand<StringResult> {
final String bucketId = getRequiredArg(properties, CommandOption.BUCKET_ID); final String bucketId = getRequiredArg(properties, CommandOption.BUCKET_ID);
final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID); final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID);
final Integer flowVersion = getRequiredIntArg(properties, CommandOption.FLOW_VERSION); final String flowVersion = getRequiredArg(properties, CommandOption.FLOW_VERSION);
final String posXStr = getArg(properties, CommandOption.POS_X); final String posXStr = getArg(properties, CommandOption.POS_X);
final String posYStr = getArg(properties, CommandOption.POS_Y); final String posYStr = getArg(properties, CommandOption.POS_Y);

View File

@ -17,19 +17,21 @@
package org.apache.nifi.toolkit.cli.impl.result.registry; package org.apache.nifi.toolkit.cli.impl.result.registry;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.toolkit.cli.api.ResultType; import org.apache.nifi.toolkit.cli.api.ResultType;
import org.apache.nifi.toolkit.cli.api.WritableResult;
import org.apache.nifi.toolkit.cli.impl.result.AbstractWritableResult; import org.apache.nifi.toolkit.cli.impl.result.AbstractWritableResult;
import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter;
import org.apache.nifi.toolkit.cli.impl.result.writer.Table;
import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataSetEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataSetEntity;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Comparator;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
/** /**
* Result for VersionedFlowSnapshotMetadataSetEntity. * Result for VersionedFlowSnapshotMetadataSetEntity.
@ -56,24 +58,34 @@ public class VersionedFlowSnapshotMetadataSetResult extends AbstractWritableResu
return; return;
} }
// this will be sorted by the child result below // sort by timestamp
final List<VersionedFlowSnapshotMetadata> snapshots = entities.stream() final List<RegisteredFlowSnapshotMetadata> snapshots = entities.stream()
.map(v -> v.getVersionedFlowSnapshotMetadata()) .map(VersionedFlowSnapshotMetadataEntity::getVersionedFlowSnapshotMetadata)
.map(this::convert) .sorted(Comparator.comparing(RegisteredFlowSnapshotMetadata::getTimestamp))
.collect(Collectors.toList()); .toList();
final WritableResult<List<VersionedFlowSnapshotMetadata>> result = new RegisteredFlowSnapshotMetadataResult(resultType, snapshots); // date length, with locale specifics
result.write(output); final String datePattern = "%1$ta, %<tb %<td %<tY %<tR %<tZ";
final int dateLength = String.format(datePattern, new Date()).length();
final Table table = new Table.Builder()
.column("Ver", 3, 3, false)
.column("Date", dateLength, dateLength, false)
.column("Author", 20, 200, true)
.column("Message", 8, 40, true)
.build();
snapshots.forEach(vfs -> {
table.addRow(
vfs.getVersion(),
String.format(datePattern, new Date(vfs.getTimestamp())),
vfs.getAuthor() == null ? "" : vfs.getAuthor(),
vfs.getComments() == null ? "" : vfs.getComments()
);
});
final TableWriter tableWriter = new DynamicTableWriter();
tableWriter.write(table, output);
} }
private VersionedFlowSnapshotMetadata convert(RegisteredFlowSnapshotMetadata metadata) {
final VersionedFlowSnapshotMetadata result = new VersionedFlowSnapshotMetadata();
result.setComments(metadata.getComments());
result.setVersion(metadata.getVersion());
result.setAuthor(metadata.getAuthor());
result.setTimestamp(metadata.getTimestamp());
result.setFlowIdentifier(metadata.getFlowIdentifier());
result.setBucketIdentifier(metadata.getBucketIdentifier());
return result;
}
} }