NIFI-5319 Utilize NiFi Registry 0.2.0 client

This closes #2801.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Bryan Bende 2018-06-18 10:52:01 -04:00 committed by Andy LoPresto
parent d4d9259664
commit 0b0ba1eae3
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
9 changed files with 58 additions and 54 deletions

View File

@ -23,7 +23,7 @@
<packaging>pom</packaging>
<description>NiFi: Framework Bundle</description>
<properties>
<nifi.registry.version>0.1.0</nifi.registry.version>
<nifi.registry.version>0.2.0</nifi.registry.version>
<jersey.version>2.26</jersey.version>
<spring.version>4.3.10.RELEASE</spring.version>
<spring.security.version>4.2.4.RELEASE</spring.security.version>

View File

@ -64,7 +64,7 @@
<dependency>
<groupId>org.apache.nifi.registry</groupId>
<artifactId>nifi-registry-client</artifactId>
<version>0.1.0</version>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>

View File

@ -18,7 +18,6 @@ package org.apache.nifi.toolkit.cli.impl.command.registry;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.registry.bucket.BucketItem;
import org.apache.nifi.registry.client.FlowSnapshotClient;
import org.apache.nifi.registry.client.NiFiRegistryClient;
import org.apache.nifi.registry.client.NiFiRegistryException;
@ -33,7 +32,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
@ -71,28 +69,10 @@ public abstract class AbstractNiFiRegistryCommand<R extends Result> extends Abst
public abstract R doExecute(final NiFiRegistryClient client, final Properties properties)
throws IOException, NiFiRegistryException, ParseException;
/*
* NOTE: This will bring back every item in the registry. We should create an end-point on the registry side
* to retrieve a flow by id and remove this later.
*/
protected String getBucketId(final NiFiRegistryClient client, final String flowId) throws IOException, NiFiRegistryException {
final List<BucketItem> items = client.getItemsClient().getAll();
final Optional<BucketItem> matchingItem = items.stream()
.filter(i -> i.getIdentifier().equals(flowId))
.findFirst();
if (!matchingItem.isPresent()) {
throw new NiFiRegistryException("Versioned flow does not exist with id " + flowId);
}
return matchingItem.get().getBucketIdentifier();
}
protected List<Integer> getVersions(final NiFiRegistryClient client, final String bucketId, final String flowId)
protected List<Integer> getVersions(final NiFiRegistryClient client, final String flowId)
throws NiFiRegistryException, IOException {
final FlowSnapshotClient srcSnapshotClient = client.getFlowSnapshotClient();
final List<VersionedFlowSnapshotMetadata> srcVersionMetadata = srcSnapshotClient.getSnapshotMetadata(bucketId, flowId);
final List<VersionedFlowSnapshotMetadata> srcVersionMetadata = srcSnapshotClient.getSnapshotMetadata(flowId);
return srcVersionMetadata.stream().map(s -> s.getVersion()).collect(Collectors.toList());
}

View File

@ -21,6 +21,7 @@ import org.apache.nifi.registry.client.FlowClient;
import org.apache.nifi.registry.client.FlowSnapshotClient;
import org.apache.nifi.registry.client.NiFiRegistryClient;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.toolkit.cli.api.Context;
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
@ -58,16 +59,16 @@ public class DeleteFlow extends AbstractNiFiRegistryCommand<OkResult> {
final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID);
final boolean forceDelete = properties.containsKey(CommandOption.FORCE.getLongName());
final String bucketId = getBucketId(client, flowId);
final FlowClient flowClient = client.getFlowClient();
final VersionedFlow versionedFlow = flowClient.get(flowId);
final FlowSnapshotClient flowSnapshotClient = client.getFlowSnapshotClient();
final List<VersionedFlowSnapshotMetadata> snapshotMetadata = flowSnapshotClient.getSnapshotMetadata(bucketId, flowId);
final List<VersionedFlowSnapshotMetadata> snapshotMetadata = flowSnapshotClient.getSnapshotMetadata(flowId);
if (snapshotMetadata != null && snapshotMetadata.size() > 0 && !forceDelete) {
throw new NiFiRegistryException("Flow has versions, use --" + CommandOption.FORCE.getLongName() + " to delete");
} else {
final FlowClient flowClient = client.getFlowClient();
flowClient.delete(bucketId, flowId);
flowClient.delete(versionedFlow.getBucketIdentifier(), versionedFlow.getIdentifier());
return new OkResult(getContext().isInteractive());
}
}

View File

@ -53,15 +53,12 @@ public class ExportFlowVersion extends AbstractNiFiRegistryCommand<VersionedFlow
final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID);
final Integer version = getIntArg(properties, CommandOption.FLOW_VERSION);
// determine the bucket for the provided flow id
final String bucketId = getBucketId(client, flowId);
// if no version was provided then export the latest, otherwise use specific version
final VersionedFlowSnapshot versionedFlowSnapshot;
if (version == null) {
versionedFlowSnapshot = client.getFlowSnapshotClient().getLatest(bucketId, flowId);
versionedFlowSnapshot = client.getFlowSnapshotClient().getLatest(flowId);
} else {
versionedFlowSnapshot = client.getFlowSnapshotClient().get(bucketId, flowId, version);
versionedFlowSnapshot = client.getFlowSnapshotClient().get(flowId, version);
}
versionedFlowSnapshot.setFlow(null);

View File

@ -19,9 +19,11 @@ package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.registry.client.FlowClient;
import org.apache.nifi.registry.client.FlowSnapshotClient;
import org.apache.nifi.registry.client.NiFiRegistryClient;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.toolkit.cli.api.Context;
@ -76,6 +78,7 @@ public class ImportFlowVersion extends AbstractNiFiRegistryCommand<StringResult>
contents = IOUtils.toString(uri, StandardCharsets.UTF_8);
}
final FlowClient flowClient = client.getFlowClient();
final FlowSnapshotClient snapshotClient = client.getFlowSnapshotClient();
final ObjectMapper objectMapper = JacksonUtils.getObjectMapper();
@ -84,13 +87,12 @@ public class ImportFlowVersion extends AbstractNiFiRegistryCommand<StringResult>
throw new IOException("Unable to deserialize flow version from " + inputFile);
}
// determine the bucket for the provided flow id
final String bucketId = getBucketId(client, flowId);
final VersionedFlow versionedFlow = flowClient.get(flowId);
// determine the latest existing version in the destination system
Integer version;
try {
final VersionedFlowSnapshotMetadata latestMetadata = snapshotClient.getLatestMetadata(bucketId, flowId);
final VersionedFlowSnapshotMetadata latestMetadata = snapshotClient.getLatestMetadata(flowId);
version = latestMetadata.getVersion() + 1;
} catch (NiFiRegistryException e) {
// when there are no versions it produces a 404 not found
@ -99,7 +101,7 @@ public class ImportFlowVersion extends AbstractNiFiRegistryCommand<StringResult>
// create new metadata using the passed in bucket and flow in the target registry, keep comments
final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
metadata.setBucketIdentifier(bucketId);
metadata.setBucketIdentifier(versionedFlow.getBucketIdentifier());
metadata.setFlowIdentifier(flowId);
metadata.setVersion(version);
metadata.setComments(deserializedSnapshot.getSnapshotMetadata().getComments());

View File

@ -53,10 +53,9 @@ public class ListFlowVersions extends AbstractNiFiRegistryCommand<VersionedFlowS
public VersionedFlowSnapshotMetadataResult doExecute(final NiFiRegistryClient client, final Properties properties)
throws ParseException, IOException, NiFiRegistryException {
final String flow = getRequiredArg(properties, CommandOption.FLOW_ID);
final String bucket = getBucketId(client, flow);
final FlowSnapshotClient snapshotClient = client.getFlowSnapshotClient();
final List<VersionedFlowSnapshotMetadata> snapshotMetadata = snapshotClient.getSnapshotMetadata(bucket, flow);
final List<VersionedFlowSnapshotMetadata> snapshotMetadata = snapshotClient.getSnapshotMetadata(flow);
return new VersionedFlowSnapshotMetadataResult(getResultType(properties), snapshotMetadata);
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
import org.apache.commons.cli.ParseException;
import org.apache.nifi.registry.client.NiFiRegistryClient;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.toolkit.cli.api.Context;
@ -71,11 +72,28 @@ public class SyncFlowVersions extends AbstractNiFiRegistryCommand<StringResult>
final NiFiRegistryClient srcClient = getSourceClient(client, srcPropsValue);
final String srcBucketId = getBucketId(srcClient, srcFlowId);
final String destBucketId = getBucketId(client, destFlowId);
// ensure source flow exists
final List<Integer> srcVersions = getVersions(srcClient, srcBucketId, srcFlowId);
final List<Integer> destVersions = getVersions(client, destBucketId, destFlowId);
final VersionedFlow srcFlow;
try {
srcFlow = srcClient.getFlowClient().get(srcFlowId);
} catch (Exception e) {
throw new NiFiRegistryException("Error retrieving source flow : " + e.getMessage(), e);
}
// ensure destination flow exists
final VersionedFlow destFlow;
try {
destFlow = client.getFlowClient().get(destFlowId);
} catch (Exception e) {
throw new NiFiRegistryException("Error retrieving destination flow : " + e.getMessage(), e);
}
// get version list for source and dest
final List<Integer> srcVersions = getVersions(srcClient, srcFlow.getIdentifier());
final List<Integer> destVersions = getVersions(client, destFlow.getIdentifier());
if (destVersions.size() > srcVersions.size()) {
throw new NiFiRegistryException("Destination flow has more versions than source flow");
@ -95,12 +113,12 @@ public class SyncFlowVersions extends AbstractNiFiRegistryCommand<StringResult>
Collections.sort(srcVersions);
for (final Integer srcVersion : srcVersions) {
final VersionedFlowSnapshot srcFlowSnapshot = srcClient.getFlowSnapshotClient().get(srcBucketId, srcFlowId, srcVersion);
final VersionedFlowSnapshot srcFlowSnapshot = srcClient.getFlowSnapshotClient().get(srcFlowId, srcVersion);
srcFlowSnapshot.setFlow(null);
srcFlowSnapshot.setBucket(null);
final VersionedFlowSnapshotMetadata destMetadata = new VersionedFlowSnapshotMetadata();
destMetadata.setBucketIdentifier(destBucketId);
destMetadata.setBucketIdentifier(destFlow.getBucketIdentifier());
destMetadata.setFlowIdentifier(destFlowId);
destMetadata.setVersion(srcVersion);
destMetadata.setComments(srcFlowSnapshot.getSnapshotMetadata().getComments());

View File

@ -19,6 +19,7 @@ package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
import org.apache.commons.cli.ParseException;
import org.apache.nifi.registry.client.NiFiRegistryClient;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.toolkit.cli.api.Context;
@ -74,25 +75,31 @@ public class TransferFlowVersion extends AbstractNiFiRegistryCommand<StringResul
final NiFiRegistryClient srcClient = getSourceClient(client, srcPropsValue);
// determine the bucket ids of the source and dest flows
final String srcBucketId = getBucketId(srcClient, srcFlowId);
final String destBucketId = getBucketId(client, destFlowId);
// get the snapshot of the source flow, either the version specified or the latest
final VersionedFlowSnapshot srcSnapshot;
if (srcFlowVersion == null) {
srcSnapshot = srcClient.getFlowSnapshotClient().getLatest(srcBucketId, srcFlowId);
srcSnapshot = srcClient.getFlowSnapshotClient().getLatest(srcFlowId);
} else {
srcSnapshot = srcClient.getFlowSnapshotClient().get(srcBucketId, srcFlowId, srcFlowVersion);
srcSnapshot = srcClient.getFlowSnapshotClient().get(srcFlowId, srcFlowVersion);
}
final Integer srcSnapshotFlowVersion = srcSnapshot.getSnapshotMetadata().getVersion();
// get the destination flow
final VersionedFlow destFlow;
try {
destFlow = client.getFlowClient().get(destFlowId);
} catch (Exception e) {
throw new NiFiRegistryException("Error retrieving destination flow : " + e.getMessage(), e);
}
// determine the next version number for the destination flow
final List<Integer> destVersions = getVersions(client, destBucketId, destFlowId);
final List<Integer> destVersions = getVersions(client, destFlow.getIdentifier());
final Integer destFlowVersion = destVersions.isEmpty() ? 1 : destVersions.get(0) + 1;
// create the new metadata for the destination snapshot
final VersionedFlowSnapshotMetadata destMetadata = new VersionedFlowSnapshotMetadata();
destMetadata.setBucketIdentifier(destBucketId);
destMetadata.setBucketIdentifier(destFlow.getBucketIdentifier());
destMetadata.setFlowIdentifier(destFlowId);
destMetadata.setVersion(destFlowVersion);
destMetadata.setComments(srcSnapshot.getSnapshotMetadata().getComments());
@ -107,7 +114,7 @@ public class TransferFlowVersion extends AbstractNiFiRegistryCommand<StringResul
if (getContext().isInteractive()) {
println();
println("Transferred version " + srcSnapshot.getSnapshotMetadata().getVersion()
println("Transferred version " + srcSnapshotFlowVersion
+ " of source flow to version " + destFlowVersion + " of destination flow");
}