NIFI-4839 - Support both public URLs and local files as inputs for import actions.

- The handling of empty canvas got lost in the merge, causing errors with a new NiFi instance.
- Broaden support for input, now supportes both local files _and_ any public URL with a schema recognized by Java runtime.
This commit is contained in:
Andrew Grande 2018-02-11 09:53:05 -05:00 committed by Pierre Villard
parent c1c808002c
commit fe71c18ec5
5 changed files with 65 additions and 47 deletions

View File

@ -48,7 +48,7 @@ public class CLICompleter implements Completer {
static {
final Set<String> args = new HashSet<>();
args.add("-" + CommandOption.PROPERTIES.getShortName());
args.add("-" + CommandOption.INPUT_FILE.getShortName());
args.add("-" + CommandOption.INPUT_SOURCE.getShortName());
args.add("-" + CommandOption.OUTPUT_FILE.getShortName());
FILE_COMPLETION_ARGS = Collections.unmodifiableSet(args);
}

View File

@ -58,7 +58,7 @@ public class ProcessGroupBox implements Comparable<ProcessGroupBox> {
public boolean intersects(ProcessGroupBox other) {
// adapted for java.awt Rectangle, we don't want to import it
// adapted from java.awt Rectangle, we don't want to import it
// assume everything to be of the PG size for simplicity
int tw = PG_SIZE_WIDTH;
int th = PG_SIZE_HEIGHT;

View File

@ -108,9 +108,13 @@ public class JerseyFlowClient extends AbstractJerseyClient implements FlowClient
pgComponents.addAll(flowDTO.getLabels());
final Set<PositionDTO> positions = pgComponents.stream()
.map(c -> c.getPosition())
.map(ComponentEntity::getPosition)
.collect(Collectors.toSet());
if (positions.isEmpty()) {
return ProcessGroupBox.CANVAS_CENTER;
}
final List<ProcessGroupBox> coords = positions.stream()
.map(p -> new ProcessGroupBox(p.getX().intValue(), p.getY().intValue()))
.collect(Collectors.toList());

View File

@ -25,7 +25,7 @@ public enum CommandOption {
// General
URL("u", "baseUrl", "The URL to execute the command against", true),
INPUT_FILE("i", "inputFile", "A file to read as input, must contain full path and filename", true),
INPUT_SOURCE("i", "input", "A local file to read as input contents, or a public URL to fetch", true),
OUTPUT_FILE("o", "outputFile", "A file to write output to, must contain full path and filename", true),
PROPERTIES("p", "properties", "A properties file to load arguments from, " +
"command line values will override anything in the properties file, must contain full path to file", true),

View File

@ -18,6 +18,7 @@ package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.registry.client.FlowSnapshotClient;
import org.apache.nifi.registry.client.NiFiRegistryClient;
import org.apache.nifi.registry.client.NiFiRegistryException;
@ -28,8 +29,12 @@ import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Properties;
/**
@ -50,55 +55,64 @@ public class ImportFlowVersion extends AbstractNiFiRegistryCommand {
@Override
public void doInitialize(final Context context) {
addOption(CommandOption.FLOW_ID.createOption());
addOption(CommandOption.INPUT_FILE.createOption());
addOption(CommandOption.INPUT_SOURCE.createOption());
}
@Override
protected void doExecute(final NiFiRegistryClient client, final Properties properties)
throws ParseException, IOException, NiFiRegistryException {
final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID);
final String inputFile = getRequiredArg(properties, CommandOption.INPUT_FILE);
final String inputFile = getRequiredArg(properties, CommandOption.INPUT_SOURCE);
try (final FileInputStream in = new FileInputStream(inputFile)) {
final FlowSnapshotClient snapshotClient = client.getFlowSnapshotClient();
final ObjectMapper objectMapper = JacksonUtils.getObjectMapper();
final VersionedFlowSnapshot deserializedSnapshot = objectMapper.readValue(in, VersionedFlowSnapshot.class);
if (deserializedSnapshot == null) {
throw new IOException("Unable to deserialize flow version from " + inputFile);
}
// determine the bucket for the provided flow id
final String bucketId = getBucketId(client, flowId);
// determine the latest existing version in the destination system
Integer version;
try {
final VersionedFlowSnapshotMetadata latestMetadata = snapshotClient.getLatestMetadata(bucketId, flowId);
version = latestMetadata.getVersion() + 1;
} catch (NiFiRegistryException e) {
// when there are no versions it produces a 404 not found
version = new Integer(1);
}
// create new metadata using the passed in bucket and flow in the target registry, keep comments
final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
metadata.setBucketIdentifier(bucketId);
metadata.setFlowIdentifier(flowId);
metadata.setVersion(version);
metadata.setComments(deserializedSnapshot.getSnapshotMetadata().getComments());
// create a new snapshot using the new metadata and the contents from the deserialized snapshot
final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
snapshot.setSnapshotMetadata(metadata);
snapshot.setFlowContents(deserializedSnapshot.getFlowContents());
final VersionedFlowSnapshot createdSnapshot = snapshotClient.create(snapshot);
final VersionedFlowSnapshotMetadata createdMetadata = createdSnapshot.getSnapshotMetadata();
println(String.valueOf(createdMetadata.getVersion()));
String contents;
try {
// try a public resource URL
URL url = new URL(inputFile);
contents = IOUtils.toString(url, StandardCharsets.UTF_8);
} catch (MalformedURLException e) {
// assume a local file then
URI uri = Paths.get(inputFile).toAbsolutePath().toUri();
contents = IOUtils.toString(uri, StandardCharsets.UTF_8);
}
}
final FlowSnapshotClient snapshotClient = client.getFlowSnapshotClient();
final ObjectMapper objectMapper = JacksonUtils.getObjectMapper();
final VersionedFlowSnapshot deserializedSnapshot = objectMapper.readValue(contents, VersionedFlowSnapshot.class);
if (deserializedSnapshot == null) {
throw new IOException("Unable to deserialize flow version from " + inputFile);
}
// determine the bucket for the provided flow id
final String bucketId = getBucketId(client, flowId);
// determine the latest existing version in the destination system
Integer version;
try {
final VersionedFlowSnapshotMetadata latestMetadata = snapshotClient.getLatestMetadata(bucketId, flowId);
version = latestMetadata.getVersion() + 1;
} catch (NiFiRegistryException e) {
// when there are no versions it produces a 404 not found
version = 1;
}
// create new metadata using the passed in bucket and flow in the target registry, keep comments
final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
metadata.setBucketIdentifier(bucketId);
metadata.setFlowIdentifier(flowId);
metadata.setVersion(version);
metadata.setComments(deserializedSnapshot.getSnapshotMetadata().getComments());
// create a new snapshot using the new metadata and the contents from the deserialized snapshot
final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
snapshot.setSnapshotMetadata(metadata);
snapshot.setFlowContents(deserializedSnapshot.getFlowContents());
final VersionedFlowSnapshot createdSnapshot = snapshotClient.create(snapshot);
final VersionedFlowSnapshotMetadata createdMetadata = createdSnapshot.getSnapshotMetadata();
println(String.valueOf(createdMetadata.getVersion()));
}
}