From 0b0ba1eae366475da3687a94d7c96719bef6ee8d Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Mon, 18 Jun 2018 10:52:01 -0400 Subject: [PATCH] NIFI-5319 Utilize NiFi Registry 0.2.0 client This closes #2801. Signed-off-by: Andy LoPresto --- .../nifi-framework-bundle/pom.xml | 2 +- nifi-toolkit/nifi-toolkit-cli/pom.xml | 2 +- .../registry/AbstractNiFiRegistryCommand.java | 24 ++------------- .../command/registry/flow/DeleteFlow.java | 9 +++--- .../registry/flow/ExportFlowVersion.java | 7 ++--- .../registry/flow/ImportFlowVersion.java | 10 ++++--- .../registry/flow/ListFlowVersions.java | 3 +- .../registry/flow/SyncFlowVersions.java | 30 +++++++++++++++---- .../registry/flow/TransferFlowVersion.java | 25 ++++++++++------ 9 files changed, 58 insertions(+), 54 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/pom.xml index 2437fae31e..ece9124dc4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/pom.xml @@ -23,7 +23,7 @@ pom NiFi: Framework Bundle - 0.1.0 + 0.2.0 2.26 4.3.10.RELEASE 4.2.4.RELEASE diff --git a/nifi-toolkit/nifi-toolkit-cli/pom.xml b/nifi-toolkit/nifi-toolkit-cli/pom.xml index e75462d19a..ca8874791f 100644 --- a/nifi-toolkit/nifi-toolkit-cli/pom.xml +++ b/nifi-toolkit/nifi-toolkit-cli/pom.xml @@ -64,7 +64,7 @@ org.apache.nifi.registry nifi-registry-client - 0.1.0 + 0.2.0 com.fasterxml.jackson.core diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/AbstractNiFiRegistryCommand.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/AbstractNiFiRegistryCommand.java index 4ed3430188..2556bc4bf5 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/AbstractNiFiRegistryCommand.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/AbstractNiFiRegistryCommand.java @@ -18,7 +18,6 @@ package org.apache.nifi.toolkit.cli.impl.command.registry; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.registry.bucket.BucketItem; import org.apache.nifi.registry.client.FlowSnapshotClient; import org.apache.nifi.registry.client.NiFiRegistryClient; import org.apache.nifi.registry.client.NiFiRegistryException; @@ -33,7 +32,6 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; -import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; @@ -71,28 +69,10 @@ public abstract class AbstractNiFiRegistryCommand extends Abst public abstract R doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException; - /* - * NOTE: This will bring back every item in the registry. We should create an end-point on the registry side - * to retrieve a flow by id and remove this later. - */ - protected String getBucketId(final NiFiRegistryClient client, final String flowId) throws IOException, NiFiRegistryException { - final List items = client.getItemsClient().getAll(); - - final Optional matchingItem = items.stream() - .filter(i -> i.getIdentifier().equals(flowId)) - .findFirst(); - - if (!matchingItem.isPresent()) { - throw new NiFiRegistryException("Versioned flow does not exist with id " + flowId); - } - - return matchingItem.get().getBucketIdentifier(); - } - - protected List getVersions(final NiFiRegistryClient client, final String bucketId, final String flowId) + protected List getVersions(final NiFiRegistryClient client, final String flowId) throws NiFiRegistryException, IOException { final FlowSnapshotClient srcSnapshotClient = client.getFlowSnapshotClient(); - final List srcVersionMetadata = srcSnapshotClient.getSnapshotMetadata(bucketId, flowId); + final List srcVersionMetadata = srcSnapshotClient.getSnapshotMetadata(flowId); return srcVersionMetadata.stream().map(s -> s.getVersion()).collect(Collectors.toList()); } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/DeleteFlow.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/DeleteFlow.java index f83434849b..d18ab2ed7e 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/DeleteFlow.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/DeleteFlow.java @@ -21,6 +21,7 @@ import org.apache.nifi.registry.client.FlowClient; import org.apache.nifi.registry.client.FlowSnapshotClient; import org.apache.nifi.registry.client.NiFiRegistryClient; import org.apache.nifi.registry.client.NiFiRegistryException; +import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.toolkit.cli.api.Context; import org.apache.nifi.toolkit.cli.impl.command.CommandOption; @@ -58,16 +59,16 @@ public class DeleteFlow extends AbstractNiFiRegistryCommand { final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID); final boolean forceDelete = properties.containsKey(CommandOption.FORCE.getLongName()); - final String bucketId = getBucketId(client, flowId); + final FlowClient flowClient = client.getFlowClient(); + final VersionedFlow versionedFlow = flowClient.get(flowId); final FlowSnapshotClient flowSnapshotClient = client.getFlowSnapshotClient(); - final List snapshotMetadata = flowSnapshotClient.getSnapshotMetadata(bucketId, flowId); + final List snapshotMetadata = flowSnapshotClient.getSnapshotMetadata(flowId); if (snapshotMetadata != null && snapshotMetadata.size() > 0 && !forceDelete) { throw new NiFiRegistryException("Flow has versions, use --" + CommandOption.FORCE.getLongName() + " to delete"); } else { - final FlowClient flowClient = client.getFlowClient(); - flowClient.delete(bucketId, flowId); + flowClient.delete(versionedFlow.getBucketIdentifier(), versionedFlow.getIdentifier()); return new OkResult(getContext().isInteractive()); } } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ExportFlowVersion.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ExportFlowVersion.java index ef40bed616..10b2b4599d 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ExportFlowVersion.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ExportFlowVersion.java @@ -53,15 +53,12 @@ public class ExportFlowVersion extends AbstractNiFiRegistryCommand contents = IOUtils.toString(uri, StandardCharsets.UTF_8); } + final FlowClient flowClient = client.getFlowClient(); final FlowSnapshotClient snapshotClient = client.getFlowSnapshotClient(); final ObjectMapper objectMapper = JacksonUtils.getObjectMapper(); @@ -84,13 +87,12 @@ public class ImportFlowVersion extends AbstractNiFiRegistryCommand throw new IOException("Unable to deserialize flow version from " + inputFile); } - // determine the bucket for the provided flow id - final String bucketId = getBucketId(client, flowId); + final VersionedFlow versionedFlow = flowClient.get(flowId); // determine the latest existing version in the destination system Integer version; try { - final VersionedFlowSnapshotMetadata latestMetadata = snapshotClient.getLatestMetadata(bucketId, flowId); + final VersionedFlowSnapshotMetadata latestMetadata = snapshotClient.getLatestMetadata(flowId); version = latestMetadata.getVersion() + 1; } catch (NiFiRegistryException e) { // when there are no versions it produces a 404 not found @@ -99,7 +101,7 @@ public class ImportFlowVersion extends AbstractNiFiRegistryCommand // create new metadata using the passed in bucket and flow in the target registry, keep comments final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); - metadata.setBucketIdentifier(bucketId); + metadata.setBucketIdentifier(versionedFlow.getBucketIdentifier()); metadata.setFlowIdentifier(flowId); metadata.setVersion(version); metadata.setComments(deserializedSnapshot.getSnapshotMetadata().getComments()); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ListFlowVersions.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ListFlowVersions.java index 5e64205bcb..a40b04bc13 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ListFlowVersions.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ListFlowVersions.java @@ -53,10 +53,9 @@ public class ListFlowVersions extends AbstractNiFiRegistryCommand snapshotMetadata = snapshotClient.getSnapshotMetadata(bucket, flow); + final List snapshotMetadata = snapshotClient.getSnapshotMetadata(flow); return new VersionedFlowSnapshotMetadataResult(getResultType(properties), snapshotMetadata); } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/SyncFlowVersions.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/SyncFlowVersions.java index 2cee5a56dc..8c560bf345 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/SyncFlowVersions.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/SyncFlowVersions.java @@ -19,6 +19,7 @@ package org.apache.nifi.toolkit.cli.impl.command.registry.flow; import org.apache.commons.cli.ParseException; import org.apache.nifi.registry.client.NiFiRegistryClient; import org.apache.nifi.registry.client.NiFiRegistryException; +import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.toolkit.cli.api.Context; @@ -71,11 +72,28 @@ public class SyncFlowVersions extends AbstractNiFiRegistryCommand final NiFiRegistryClient srcClient = getSourceClient(client, srcPropsValue); - final String srcBucketId = getBucketId(srcClient, srcFlowId); - final String destBucketId = getBucketId(client, destFlowId); + // ensure source flow exists - final List srcVersions = getVersions(srcClient, srcBucketId, srcFlowId); - final List destVersions = getVersions(client, destBucketId, destFlowId); + final VersionedFlow srcFlow; + try { + srcFlow = srcClient.getFlowClient().get(srcFlowId); + } catch (Exception e) { + throw new NiFiRegistryException("Error retrieving source flow : " + e.getMessage(), e); + } + + // ensure destination flow exists + + final VersionedFlow destFlow; + try { + destFlow = client.getFlowClient().get(destFlowId); + } catch (Exception e) { + throw new NiFiRegistryException("Error retrieving destination flow : " + e.getMessage(), e); + } + + // get version list for source and dest + + final List srcVersions = getVersions(srcClient, srcFlow.getIdentifier()); + final List destVersions = getVersions(client, destFlow.getIdentifier()); if (destVersions.size() > srcVersions.size()) { throw new NiFiRegistryException("Destination flow has more versions than source flow"); @@ -95,12 +113,12 @@ public class SyncFlowVersions extends AbstractNiFiRegistryCommand Collections.sort(srcVersions); for (final Integer srcVersion : srcVersions) { - final VersionedFlowSnapshot srcFlowSnapshot = srcClient.getFlowSnapshotClient().get(srcBucketId, srcFlowId, srcVersion); + final VersionedFlowSnapshot srcFlowSnapshot = srcClient.getFlowSnapshotClient().get(srcFlowId, srcVersion); srcFlowSnapshot.setFlow(null); srcFlowSnapshot.setBucket(null); final VersionedFlowSnapshotMetadata destMetadata = new VersionedFlowSnapshotMetadata(); - destMetadata.setBucketIdentifier(destBucketId); + destMetadata.setBucketIdentifier(destFlow.getBucketIdentifier()); destMetadata.setFlowIdentifier(destFlowId); destMetadata.setVersion(srcVersion); destMetadata.setComments(srcFlowSnapshot.getSnapshotMetadata().getComments()); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/TransferFlowVersion.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/TransferFlowVersion.java index 08f003527b..5d5ac048d8 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/TransferFlowVersion.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/TransferFlowVersion.java @@ -19,6 +19,7 @@ package org.apache.nifi.toolkit.cli.impl.command.registry.flow; import org.apache.commons.cli.ParseException; import org.apache.nifi.registry.client.NiFiRegistryClient; import org.apache.nifi.registry.client.NiFiRegistryException; +import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.toolkit.cli.api.Context; @@ -74,25 +75,31 @@ public class TransferFlowVersion extends AbstractNiFiRegistryCommand destVersions = getVersions(client, destBucketId, destFlowId); + final List destVersions = getVersions(client, destFlow.getIdentifier()); final Integer destFlowVersion = destVersions.isEmpty() ? 1 : destVersions.get(0) + 1; // create the new metadata for the destination snapshot final VersionedFlowSnapshotMetadata destMetadata = new VersionedFlowSnapshotMetadata(); - destMetadata.setBucketIdentifier(destBucketId); + destMetadata.setBucketIdentifier(destFlow.getBucketIdentifier()); destMetadata.setFlowIdentifier(destFlowId); destMetadata.setVersion(destFlowVersion); destMetadata.setComments(srcSnapshot.getSnapshotMetadata().getComments()); @@ -107,7 +114,7 @@ public class TransferFlowVersion extends AbstractNiFiRegistryCommand