From fe71c18ec58c1b4b2971d32b8184dd0d2dbba402 Mon Sep 17 00:00:00 2001 From: Andrew Grande Date: Sun, 11 Feb 2018 09:53:05 -0500 Subject: [PATCH] NIFI-4839 - Support both public URLs and local files as inputs for import actions. - The handling of empty canvas got lost in the merge, causing errors with a new NiFi instance. - Broaden support for input, now supportes both local files _and_ any public URL with a schema recognized by Java runtime. --- .../apache/nifi/toolkit/cli/CLICompleter.java | 2 +- .../cli/impl/client/nifi/ProcessGroupBox.java | 2 +- .../client/nifi/impl/JerseyFlowClient.java | 6 +- .../cli/impl/command/CommandOption.java | 2 +- .../registry/flow/ImportFlowVersion.java | 100 ++++++++++-------- 5 files changed, 65 insertions(+), 47 deletions(-) diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/CLICompleter.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/CLICompleter.java index 7f6c864688..dad5416588 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/CLICompleter.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/CLICompleter.java @@ -48,7 +48,7 @@ public class CLICompleter implements Completer { static { final Set args = new HashSet<>(); args.add("-" + CommandOption.PROPERTIES.getShortName()); - args.add("-" + CommandOption.INPUT_FILE.getShortName()); + args.add("-" + CommandOption.INPUT_SOURCE.getShortName()); args.add("-" + CommandOption.OUTPUT_FILE.getShortName()); FILE_COMPLETION_ARGS = Collections.unmodifiableSet(args); } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupBox.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupBox.java index ce1cac7703..b671e26dd3 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupBox.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupBox.java @@ -58,7 +58,7 @@ public class ProcessGroupBox implements Comparable { public boolean intersects(ProcessGroupBox other) { - // adapted for java.awt Rectangle, we don't want to import it + // adapted from java.awt Rectangle, we don't want to import it // assume everything to be of the PG size for simplicity int tw = PG_SIZE_WIDTH; int th = PG_SIZE_HEIGHT; diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyFlowClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyFlowClient.java index 26c082afb2..c82a9917a1 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyFlowClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyFlowClient.java @@ -108,9 +108,13 @@ public class JerseyFlowClient extends AbstractJerseyClient implements FlowClient pgComponents.addAll(flowDTO.getLabels()); final Set positions = pgComponents.stream() - .map(c -> c.getPosition()) + .map(ComponentEntity::getPosition) .collect(Collectors.toSet()); + if (positions.isEmpty()) { + return ProcessGroupBox.CANVAS_CENTER; + } + final List coords = positions.stream() .map(p -> new ProcessGroupBox(p.getX().intValue(), p.getY().intValue())) .collect(Collectors.toList()); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java index 7f890cb61a..50986d2a49 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java @@ -25,7 +25,7 @@ public enum CommandOption { // General URL("u", "baseUrl", "The URL to execute the command against", true), - INPUT_FILE("i", "inputFile", "A file to read as input, must contain full path and filename", true), + INPUT_SOURCE("i", "input", "A local file to read as input contents, or a public URL to fetch", true), OUTPUT_FILE("o", "outputFile", "A file to write output to, must contain full path and filename", true), PROPERTIES("p", "properties", "A properties file to load arguments from, " + "command line values will override anything in the properties file, must contain full path to file", true), diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportFlowVersion.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportFlowVersion.java index 3e739de451..21df12340a 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportFlowVersion.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportFlowVersion.java @@ -18,6 +18,7 @@ package org.apache.nifi.toolkit.cli.impl.command.registry.flow; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; import org.apache.nifi.registry.client.FlowSnapshotClient; import org.apache.nifi.registry.client.NiFiRegistryClient; import org.apache.nifi.registry.client.NiFiRegistryException; @@ -28,8 +29,12 @@ import org.apache.nifi.toolkit.cli.impl.command.CommandOption; import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand; import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils; -import java.io.FileInputStream; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; import java.util.Properties; /** @@ -50,55 +55,64 @@ public class ImportFlowVersion extends AbstractNiFiRegistryCommand { @Override public void doInitialize(final Context context) { addOption(CommandOption.FLOW_ID.createOption()); - addOption(CommandOption.INPUT_FILE.createOption()); + addOption(CommandOption.INPUT_SOURCE.createOption()); } @Override protected void doExecute(final NiFiRegistryClient client, final Properties properties) throws ParseException, IOException, NiFiRegistryException { final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID); - final String inputFile = getRequiredArg(properties, CommandOption.INPUT_FILE); + final String inputFile = getRequiredArg(properties, CommandOption.INPUT_SOURCE); - try (final FileInputStream in = new FileInputStream(inputFile)) { - final FlowSnapshotClient snapshotClient = client.getFlowSnapshotClient(); - - final ObjectMapper objectMapper = JacksonUtils.getObjectMapper(); - final VersionedFlowSnapshot deserializedSnapshot = objectMapper.readValue(in, VersionedFlowSnapshot.class); - if (deserializedSnapshot == null) { - throw new IOException("Unable to deserialize flow version from " + inputFile); - } - - // determine the bucket for the provided flow id - final String bucketId = getBucketId(client, flowId); - - // determine the latest existing version in the destination system - Integer version; - try { - final VersionedFlowSnapshotMetadata latestMetadata = snapshotClient.getLatestMetadata(bucketId, flowId); - version = latestMetadata.getVersion() + 1; - } catch (NiFiRegistryException e) { - // when there are no versions it produces a 404 not found - version = new Integer(1); - } - - // create new metadata using the passed in bucket and flow in the target registry, keep comments - final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); - metadata.setBucketIdentifier(bucketId); - metadata.setFlowIdentifier(flowId); - metadata.setVersion(version); - metadata.setComments(deserializedSnapshot.getSnapshotMetadata().getComments()); - - // create a new snapshot using the new metadata and the contents from the deserialized snapshot - final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); - snapshot.setSnapshotMetadata(metadata); - snapshot.setFlowContents(deserializedSnapshot.getFlowContents()); - - - final VersionedFlowSnapshot createdSnapshot = snapshotClient.create(snapshot); - final VersionedFlowSnapshotMetadata createdMetadata = createdSnapshot.getSnapshotMetadata(); - - println(String.valueOf(createdMetadata.getVersion())); + String contents; + try { + // try a public resource URL + URL url = new URL(inputFile); + contents = IOUtils.toString(url, StandardCharsets.UTF_8); + } catch (MalformedURLException e) { + // assume a local file then + URI uri = Paths.get(inputFile).toAbsolutePath().toUri(); + contents = IOUtils.toString(uri, StandardCharsets.UTF_8); } - } + + final FlowSnapshotClient snapshotClient = client.getFlowSnapshotClient(); + + final ObjectMapper objectMapper = JacksonUtils.getObjectMapper(); + final VersionedFlowSnapshot deserializedSnapshot = objectMapper.readValue(contents, VersionedFlowSnapshot.class); + if (deserializedSnapshot == null) { + throw new IOException("Unable to deserialize flow version from " + inputFile); + } + + // determine the bucket for the provided flow id + final String bucketId = getBucketId(client, flowId); + + // determine the latest existing version in the destination system + Integer version; + try { + final VersionedFlowSnapshotMetadata latestMetadata = snapshotClient.getLatestMetadata(bucketId, flowId); + version = latestMetadata.getVersion() + 1; + } catch (NiFiRegistryException e) { + // when there are no versions it produces a 404 not found + version = 1; + } + + // create new metadata using the passed in bucket and flow in the target registry, keep comments + final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); + metadata.setBucketIdentifier(bucketId); + metadata.setFlowIdentifier(flowId); + metadata.setVersion(version); + metadata.setComments(deserializedSnapshot.getSnapshotMetadata().getComments()); + + // create a new snapshot using the new metadata and the contents from the deserialized snapshot + final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); + snapshot.setSnapshotMetadata(metadata); + snapshot.setFlowContents(deserializedSnapshot.getFlowContents()); + + + final VersionedFlowSnapshot createdSnapshot = snapshotClient.create(snapshot); + final VersionedFlowSnapshotMetadata createdMetadata = createdSnapshot.getSnapshotMetadata(); + + println(String.valueOf(createdMetadata.getVersion())); +} }