mirror of
https://github.com/apache/nifi.git
synced 2025-02-07 18:48:51 +00:00
NIFI-4839 Improving back-ref support so that ReferenceResolver is passed the option being resolved
- Adding ResolvedReference to encapsulate the results of resolving a back-reference. - Update README.md - Added OkResult for delete commands - Added sync-flow-versions and transfer-flow-version to registry commands - Returning appropriate status code when exiting standalone mode - Adding security section to README Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2477.
This commit is contained in:
parent
2fd24b78e6
commit
5041bea773
@ -22,14 +22,14 @@ Most commands will require specifying a baseUrl for the NiFi or NiFi registry in
|
||||
|
||||
An example command to list the buckets in a NiFi Registry instance would be the following:
|
||||
|
||||
./bin/cli.sh nifi-reg list-buckets -u http://localhost:18080
|
||||
./bin/cli.sh registry list-buckets -u http://localhost:18080
|
||||
|
||||
In order to avoid specifying the URL (and possibly other optional arguments for TLS) on every command,
|
||||
you can define a properties file containing the reptitive arguments.
|
||||
you can define a properties file containing the repetitive arguments.
|
||||
|
||||
An example properties file for a local NiFi Registry instance would look like the following:
|
||||
|
||||
baseUrl=https://localhost:18443
|
||||
baseUrl=http://localhost:18080
|
||||
keystore=
|
||||
keystoreType=
|
||||
keystorePasswd=
|
||||
@ -37,10 +37,11 @@ An example properties file for a local NiFi Registry instance would look like th
|
||||
truststore=
|
||||
truststoreType=
|
||||
truststorePasswd=
|
||||
proxiedEntity=
|
||||
|
||||
This properties file can then be used on a command by specifying -p <path-to-props-file> :
|
||||
|
||||
./bin/cli.sh nifi-reg list-buckets -p /path/to/local-nifi-registry.properties
|
||||
./bin/cli.sh registry list-buckets -p /path/to/local-nifi-registry.properties
|
||||
|
||||
You could then maintain a properties file for each environment you plan to interact with, such as dev, qa, prod.
|
||||
|
||||
@ -58,7 +59,7 @@ An example of setting the default property files would be following:
|
||||
This will write the above properties into the .nifi-cli.config in the user's home directory and will
|
||||
allow commands to be executed without specifying a URL or properties file:
|
||||
|
||||
./bin/cli.sh nifi-reg list-buckets
|
||||
./bin/cli.sh registry list-buckets
|
||||
|
||||
The above command will now use the baseUrl from *local-nifi-registry.properties*.
|
||||
|
||||
@ -68,6 +69,56 @@ The order of resolving an argument is the following:
|
||||
* A properties file argument (-p) overrides the session
|
||||
* The session is used when nothing else is specified
|
||||
|
||||
## Security Configuration
|
||||
|
||||
If NiFi and NiFi Registry are secured, then commands executed from the CLI will need to make a TLS connection and
|
||||
authenticate as a user with permissions to perform the desired action.
|
||||
|
||||
Currently the CLI supports authenticating with a client certificate and an optional proxied-entity. A common scenario
|
||||
would be running the CLI from one of the nodes where NiFi or NiFi Registry is installed, which allows the CLI to use
|
||||
the same key store and trust store as the NiFi/NiFi Registry instance.
|
||||
|
||||
The security configuration can be specified per-command, or in one of the properties files described in the previous section.
|
||||
|
||||
The examples below are for NiFi Registry, but the same concept applies for NiFi commands.
|
||||
|
||||
### Example - Secure NiFi Registry without Proxied-Entity
|
||||
|
||||
Assuming we have a keystore containing the certificate for *'CN=user1, OU=NIFI'*, an example properties file would
|
||||
be the following:
|
||||
|
||||
baseUrl=https://localhost:18443
|
||||
keystore=/path/to/keystore.jks
|
||||
keystoreType=JKS
|
||||
keystorePasswd=changeme
|
||||
keyPasswd=changeme
|
||||
truststore=/path/to/truststore.jks
|
||||
truststoreType=JKS
|
||||
truststorePasswd=changeme
|
||||
|
||||
In this example, commands will be executed as *'CN=user1, OU=NIFI'*. This user would need to be a user in NiFi Registry,
|
||||
and commands accessing buckets would be restricted to buckets this user has access to.
|
||||
|
||||
### Example - Secure NiFi Registry with Proxied-Entity
|
||||
|
||||
Assuming we have access to the keystore of NiFi Registry itself, and that NiFi Registry is also configured to allow
|
||||
Kerberos or LDAP authentication, an example properties file would be the following:
|
||||
|
||||
baseUrl=https://localhost:18443
|
||||
keystore=/path/to/keystore.jks
|
||||
keystoreType=JKS
|
||||
keystorePasswd=changeme
|
||||
keyPasswd=changeme
|
||||
truststore=/path/to/truststore.jks
|
||||
truststoreType=JKS
|
||||
truststorePasswd=changeme
|
||||
proxiedEntity=user1@NIFI.COM
|
||||
|
||||
In this example, the certificate in keystore.jks would be for the NiFi Registry server, for example *'CN=localhost, OU=NIFI'*.
|
||||
This identity would need to be defined as a user in NiFi Registry and given permissions to 'Proxy'.
|
||||
|
||||
*'CN=localhost, OU=NIFI'* would be proxying commands to be executed as *'user1@NIFI.COM'*.
|
||||
|
||||
## Interactive Usage
|
||||
|
||||
In interactive mode the tab key can be used to perform auto-completion.
|
||||
@ -75,7 +126,7 @@ In interactive mode the tab key can be used to perform auto-completion.
|
||||
For example, typing tab at an empty prompt should display possible commands for the first argument:
|
||||
|
||||
#>
|
||||
exit help nifi nifi-reg session
|
||||
exit help nifi registry session
|
||||
|
||||
Typing "nifi " and then a tab will show the sub-commands for NiFi:
|
||||
|
||||
@ -124,6 +175,65 @@ Example of json output for list-buckets:
|
||||
}
|
||||
} ]
|
||||
|
||||
## Back Referencing
|
||||
|
||||
When using the interactive CLI, a common scenario will be using an id from a previous
|
||||
result as the input to the next command. Back-referencing provides a shortcut for
|
||||
referencing a result from the previous command via a positional reference.
|
||||
|
||||
NOTE: Not every command produces back-references. To determine if a command
|
||||
supports back-referencing, check the usage.
|
||||
|
||||
#> registry list-buckets help
|
||||
|
||||
Lists the buckets that the current user has access to.
|
||||
|
||||
PRODUCES BACK-REFERENCES
|
||||
|
||||
A common scenario for utilizing back-references would be the following:
|
||||
|
||||
1) User starts by exploring the available buckets in a registry instance
|
||||
|
||||
#> registry list-buckets
|
||||
|
||||
# Name Id Description
|
||||
- ------------ ------------------------------------ -----------
|
||||
1 My Bucket 3c7b7467-0012-4d8f-a918-6aa42b6b9d39 (empty)
|
||||
2 Other Bucket 175fb557-43a2-4abb-871f-81a354f47bc2 (empty)
|
||||
|
||||
2) User then views the flows in one of the buckets using a back-reference to the bucket id from the previous result in position 1
|
||||
|
||||
#> registry list-flows -b &1
|
||||
|
||||
Using a positional back-reference for 'My Bucket'
|
||||
|
||||
# Name Id Description
|
||||
- ------- ------------------------------------ ----------------
|
||||
1 My Flow 06acb207-d2f1-447f-85ed-9b8672fe6d30 This is my flow.
|
||||
|
||||
3) User then views the version of the flow using a back-reference to the flow id from the previous result in position 1
|
||||
|
||||
#> registry list-flow-versions -f &1
|
||||
|
||||
Using a positional back-reference for 'My Flow'
|
||||
|
||||
Ver Date Author Message
|
||||
--- -------------------------- ------------------------ -------------------------------------
|
||||
1 Tue, Jan 23 2018 09:48 EST anonymous This is the first version of my flow.
|
||||
|
||||
4) User deploys version 1 of the flow using back-references to the bucket and flow id from step 2
|
||||
|
||||
#> nifi pg-import -b &1 -f &1 -fv 1
|
||||
|
||||
Using a positional back-reference for 'My Bucket'
|
||||
|
||||
Using a positional back-reference for 'My Flow'
|
||||
|
||||
9bd157d4-0161-1000-b946-c1f9b1832efd
|
||||
|
||||
The reason step 4 was able to reference the results from step 2, is because the list-flow-versions
|
||||
command in step 3 does not produce back-references, so the results from step 2 are still available.
|
||||
|
||||
## Adding Commands
|
||||
|
||||
To add a NiFi command, create a new class that extends AbstractNiFiCommand:
|
||||
@ -151,4 +261,4 @@ Add the new command to NiFiCommandGroup:
|
||||
commands.add(new MyCommand());
|
||||
|
||||
To add a NiFi Registry command, perform the same steps, but extend from
|
||||
AbstractNiFiRegistryCommand, and add the command to NiFiRegistryCommandGroup.
|
||||
AbstractNiFiRegistryCommand, and add the command to NiFiRegistryCommandGroup.
|
||||
|
@ -82,7 +82,6 @@
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<version>2.6</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
@ -47,11 +47,11 @@ public class CLICompleter implements Completer {
|
||||
private static final Set<String> FILE_COMPLETION_ARGS;
|
||||
static {
|
||||
final Set<String> args = new HashSet<>();
|
||||
args.add("-" + CommandOption.PROPERTIES.getShortName());
|
||||
args.add("-" + CommandOption.INPUT_SOURCE.getShortName());
|
||||
args.add("-" + CommandOption.OUTPUT_FILE.getShortName());
|
||||
args.add("-" + CommandOption.NIFI_REG_PROPS.getShortName());
|
||||
args.add("-" + CommandOption.NIFI_PROPS.getShortName());
|
||||
for (final CommandOption option : CommandOption.values()) {
|
||||
if (option.isFile()) {
|
||||
args.add("-" + option.getShortName());
|
||||
}
|
||||
}
|
||||
FILE_COMPLETION_ARGS = Collections.unmodifiableSet(args);
|
||||
}
|
||||
|
||||
|
@ -67,9 +67,15 @@ public class CLIMain {
|
||||
if (args == null || args.length == 0) {
|
||||
runInteractiveCLI();
|
||||
} else {
|
||||
runSingleCommand(args);
|
||||
System.out.println();
|
||||
System.out.flush();
|
||||
// in standalone mode we want to make sure the process exits with the correct status
|
||||
try {
|
||||
final int returnCode = runSingleCommand(args);
|
||||
System.exit(returnCode);
|
||||
} catch (Exception e) {
|
||||
// shouldn't really get here, but just in case
|
||||
e.printStackTrace();
|
||||
System.exit(-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -130,13 +136,13 @@ public class CLIMain {
|
||||
*
|
||||
* @param args the args passed in from the command line
|
||||
*/
|
||||
private static void runSingleCommand(final String[] args) {
|
||||
private static int runSingleCommand(final String[] args) {
|
||||
final Context context = createContext(System.out, false);
|
||||
final Map<String,Command> topLevelCommands = CommandFactory.createTopLevelCommands(context);
|
||||
final Map<String,CommandGroup> commandGroups = CommandFactory.createCommandGroups(context);
|
||||
|
||||
final CommandProcessor commandProcessor = new CommandProcessor(topLevelCommands, commandGroups, context);
|
||||
commandProcessor.process(args);
|
||||
return commandProcessor.process(args);
|
||||
}
|
||||
|
||||
private static Context createContext(final PrintStream output, final boolean isInteractive) {
|
||||
|
@ -16,6 +16,8 @@
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.api;
|
||||
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
|
||||
/**
|
||||
* An object that is capable of resolving a positional reference to some value that corresponds with the reference.
|
||||
*/
|
||||
@ -24,10 +26,11 @@ public interface ReferenceResolver {
|
||||
/**
|
||||
* Resolves the passed in positional reference to it's corresponding value.
|
||||
*
|
||||
* @param option the option that the reference is being resolved for, implementers should protect against a possible null option
|
||||
* @param position a position in this back reference
|
||||
* @return the resolved value for the given position
|
||||
*/
|
||||
String resolve(Integer position);
|
||||
ResolvedReference resolve(CommandOption option, Integer position);
|
||||
|
||||
/**
|
||||
* @return true if the there are no references to resolve, false otherwise
|
||||
|
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.api;
|
||||
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
|
||||
/**
|
||||
* Represents a resolved back-reference produced by a ReferenceResolver.
|
||||
*/
|
||||
public class ResolvedReference {
|
||||
|
||||
private final CommandOption option;
|
||||
|
||||
private final Integer position;
|
||||
|
||||
private final String displayName;
|
||||
|
||||
private final String resolvedValue;
|
||||
|
||||
public ResolvedReference(
|
||||
final CommandOption option,
|
||||
final Integer position,
|
||||
final String displayName,
|
||||
final String resolvedValue) {
|
||||
this.option = option;
|
||||
this.position = position;
|
||||
this.displayName = displayName;
|
||||
this.resolvedValue = resolvedValue;
|
||||
Validate.notNull(this.position);
|
||||
Validate.notNull(this.displayName);
|
||||
Validate.notNull(this.resolvedValue);
|
||||
}
|
||||
|
||||
public CommandOption getOption() {
|
||||
return option;
|
||||
}
|
||||
|
||||
public Integer getPosition() {
|
||||
return position;
|
||||
}
|
||||
|
||||
public String getDisplayName() {
|
||||
return displayName;
|
||||
}
|
||||
|
||||
public String getResolvedValue() {
|
||||
return resolvedValue;
|
||||
}
|
||||
}
|
@ -124,7 +124,7 @@ public abstract class AbstractCommand<R extends Result> implements Command<R> {
|
||||
|
||||
final PrintWriter printWriter = new PrintWriter(output);
|
||||
|
||||
final int width = 160;
|
||||
final int width = 80;
|
||||
final HelpFormatter hf = new HelpFormatter();
|
||||
hf.setWidth(width);
|
||||
|
||||
@ -174,7 +174,7 @@ public abstract class AbstractCommand<R extends Result> implements Command<R> {
|
||||
protected String getRequiredArg(final Properties properties, final CommandOption option) throws MissingOptionException {
|
||||
final String argValue = properties.getProperty(option.getLongName());
|
||||
if (StringUtils.isBlank(argValue)) {
|
||||
throw new MissingOptionException("Missing required option '" + option.getLongName() + "'");
|
||||
throw new MissingOptionException("Missing required option --" + option.getLongName());
|
||||
}
|
||||
return argValue;
|
||||
}
|
||||
@ -195,7 +195,7 @@ public abstract class AbstractCommand<R extends Result> implements Command<R> {
|
||||
protected Integer getRequiredIntArg(final Properties properties, final CommandOption option) throws MissingOptionException {
|
||||
final String argValue = properties.getProperty(option.getLongName());
|
||||
if (StringUtils.isBlank(argValue)) {
|
||||
throw new MissingOptionException("Missing required option '" + option.getLongName() + "'");
|
||||
throw new MissingOptionException("Missing required option --" + option.getLongName());
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -25,13 +25,13 @@ public enum CommandOption {
|
||||
|
||||
// General
|
||||
URL("u", "baseUrl", "The URL to execute the command against", true),
|
||||
INPUT_SOURCE("i", "input", "A local file to read as input contents, or a public URL to fetch", true),
|
||||
OUTPUT_FILE("o", "outputFile", "A file to write output to, must contain full path and filename", true),
|
||||
INPUT_SOURCE("i", "input", "A local file to read as input contents, or a public URL to fetch", true, true),
|
||||
OUTPUT_FILE("o", "outputFile", "A file to write output to, must contain full path and filename", true, true),
|
||||
PROPERTIES("p", "properties", "A properties file to load arguments from, " +
|
||||
"command line values will override anything in the properties file, must contain full path to file", true),
|
||||
"command line values will override anything in the properties file, must contain full path to file", true, true),
|
||||
|
||||
NIFI_PROPS("nifiProps", "nifiProps", "A properties file to load for NiFi config", true),
|
||||
NIFI_REG_PROPS("nifiRegProps", "nifiRegProps", "A properties file to load for NiFi Registry config", true),
|
||||
NIFI_PROPS("nifiProps", "nifiProps", "A properties file to load for NiFi config", true, true),
|
||||
NIFI_REG_PROPS("nifiRegProps", "nifiRegProps", "A properties file to load for NiFi Registry config", true, true),
|
||||
|
||||
// Registry - Buckets
|
||||
BUCKET_ID("b", "bucketIdentifier", "A bucket identifier", true),
|
||||
@ -44,6 +44,11 @@ public enum CommandOption {
|
||||
FLOW_DESC("fd", "flowDesc", "A flow description", true),
|
||||
FLOW_VERSION("fv", "flowVersion", "A version of a flow", true),
|
||||
|
||||
// Registry - Source options for when there are two registries involved and one is a source
|
||||
SRC_PROPS("sp", "sourceProps", "A properties file to load for the source", true, true),
|
||||
SRC_FLOW_ID("sf", "sourceFlowIdentifier", "A flow identifier from the source registry", true),
|
||||
SRC_FLOW_VERSION("sfv", "sourceFlowVersion", "A version of a flow from the source registry", true),
|
||||
|
||||
// NiFi - Registries
|
||||
REGISTRY_CLIENT_ID("rcid", "registryClientId", "The id of a registry client", true),
|
||||
REGISTRY_CLIENT_NAME("rcn", "registryClientName", "The name of the registry client", true),
|
||||
@ -78,14 +83,21 @@ public enum CommandOption {
|
||||
private final String longName;
|
||||
private final String description;
|
||||
private final boolean hasArg;
|
||||
private final boolean isFile;
|
||||
|
||||
CommandOption(final String shortName, final String longName, final String description, final boolean hasArg) {
|
||||
this(shortName, longName, description, hasArg, false);
|
||||
}
|
||||
|
||||
CommandOption(final String shortName, final String longName, final String description, final boolean hasArg, final boolean isFile) {
|
||||
this.shortName = shortName;
|
||||
this.longName = longName;
|
||||
this.description = description;
|
||||
this.hasArg = hasArg;
|
||||
this.isFile = isFile;
|
||||
}
|
||||
|
||||
|
||||
public String getShortName() {
|
||||
return shortName;
|
||||
}
|
||||
@ -98,6 +110,10 @@ public enum CommandOption {
|
||||
return description;
|
||||
}
|
||||
|
||||
public boolean isFile() {
|
||||
return isFile;
|
||||
}
|
||||
|
||||
public Option createOption() {
|
||||
return Option.builder(shortName).longOpt(longName).desc(description).hasArg(hasArg).build();
|
||||
}
|
||||
|
@ -22,20 +22,20 @@ import org.apache.commons.cli.DefaultParser;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.apache.nifi.registry.client.NiFiRegistryException;
|
||||
import org.apache.nifi.toolkit.cli.api.Command;
|
||||
import org.apache.nifi.toolkit.cli.api.CommandException;
|
||||
import org.apache.nifi.toolkit.cli.api.CommandGroup;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.api.ReferenceResolver;
|
||||
import org.apache.nifi.toolkit.cli.api.Referenceable;
|
||||
import org.apache.nifi.toolkit.cli.api.ResolvedReference;
|
||||
import org.apache.nifi.toolkit.cli.api.Result;
|
||||
import org.apache.nifi.toolkit.cli.api.WritableResult;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ -117,82 +117,101 @@ public class CommandProcessor {
|
||||
return;
|
||||
}
|
||||
|
||||
final List<ResolvedReference> resolvedReferences = new ArrayList<>();
|
||||
|
||||
for (int i=0; i < args.length; i++) {
|
||||
final String arg = args[i];
|
||||
if (arg == null || !arg.startsWith(BACK_REF_INDICATOR)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (context.isInteractive()) {
|
||||
context.getOutput().println();
|
||||
}
|
||||
|
||||
try {
|
||||
// attempt to determine which option is using the back-ref
|
||||
CommandOption option = null;
|
||||
if (i > 0) {
|
||||
String prevArg = args[i - 1];
|
||||
if (prevArg.startsWith("--")) {
|
||||
prevArg = prevArg.substring(2);
|
||||
} else if (prevArg.startsWith("-")) {
|
||||
prevArg = prevArg.substring(1);
|
||||
}
|
||||
|
||||
for (CommandOption opt : CommandOption.values()) {
|
||||
if (opt.getShortName().equals(prevArg) || opt.getLongName().equals(prevArg)) {
|
||||
option = opt;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// use the option and position to resolve the back-ref, and if it resolves then replace the arg
|
||||
final Integer pos = Integer.valueOf(arg.substring(1));
|
||||
final String resolvedReference = referenceResolver.resolve(pos);
|
||||
final ResolvedReference resolvedReference = referenceResolver.resolve(option, pos);
|
||||
if (resolvedReference != null) {
|
||||
args[i] = resolvedReference;
|
||||
args[i] = resolvedReference.getResolvedValue();
|
||||
resolvedReferences.add(resolvedReference);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// skip
|
||||
}
|
||||
}
|
||||
|
||||
if (context.isInteractive()) {
|
||||
for (ResolvedReference resolvedRef : resolvedReferences) {
|
||||
out.println();
|
||||
out.printf("Using a positional back-reference for '%s'%n", resolvedRef.getDisplayName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void process(String[] args) {
|
||||
public int process(String[] args) {
|
||||
if (args == null || args.length == 0) {
|
||||
printBasicUsage(null);
|
||||
return;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (CommandOption.HELP.getLongName().equalsIgnoreCase(args[0])) {
|
||||
if (args.length == 2 && "-v".equalsIgnoreCase(args[1])) {
|
||||
printBasicUsage(null, true);
|
||||
return;
|
||||
return 0;
|
||||
} else {
|
||||
printBasicUsage(null);
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
final String commandStr = args[0];
|
||||
if (topLevelCommands.containsKey(commandStr)) {
|
||||
processTopLevelCommand(commandStr, args);
|
||||
return processTopLevelCommand(commandStr, args);
|
||||
} else if (commandGroups.containsKey(commandStr)) {
|
||||
processGroupCommand(commandStr, args);
|
||||
return processGroupCommand(commandStr, args);
|
||||
} else {
|
||||
printBasicUsage("Unknown command '" + commandStr + "'");
|
||||
return;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
private void processTopLevelCommand(final String commandStr, final String[] args) {
|
||||
private int processTopLevelCommand(final String commandStr, final String[] args) {
|
||||
final Command command = topLevelCommands.get(commandStr);
|
||||
|
||||
if (command == null) {
|
||||
printBasicUsage("Unknown command '" + commandStr + "'");
|
||||
return;
|
||||
return -1;
|
||||
}
|
||||
|
||||
try {
|
||||
final String[] otherArgs = Arrays.copyOfRange(args, 1, args.length, String[].class);
|
||||
final CommandLine commandLine = parseCli(command, otherArgs);
|
||||
if (commandLine == null) {
|
||||
out.println("Unable to parse command line");
|
||||
return;
|
||||
}
|
||||
|
||||
processCommand(otherArgs, commandLine, command);
|
||||
|
||||
return processCommand(otherArgs, command);
|
||||
} catch (Exception e) {
|
||||
command.printUsage(e.getMessage());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
private void processGroupCommand(final String commandGroupStr, final String[] args) {
|
||||
private int processGroupCommand(final String commandGroupStr, final String[] args) {
|
||||
if (args.length <= 1) {
|
||||
printBasicUsage("No command provided to " + commandGroupStr);
|
||||
return;
|
||||
return -1;
|
||||
}
|
||||
|
||||
final String commandStr = args[1];
|
||||
@ -205,25 +224,26 @@ public class CommandProcessor {
|
||||
|
||||
if (command == null) {
|
||||
printBasicUsage("Unknown command '" + commandGroupStr + " " + commandStr + "'");
|
||||
return;
|
||||
return -1;
|
||||
}
|
||||
|
||||
try {
|
||||
final String[] otherArgs = Arrays.copyOfRange(args, 2, args.length, String[].class);
|
||||
final CommandLine commandLine = parseCli(command, otherArgs);
|
||||
if (commandLine == null) {
|
||||
out.println("Unable to parse command line");
|
||||
return;
|
||||
}
|
||||
|
||||
processCommand(otherArgs, commandLine, command);
|
||||
|
||||
return processCommand(otherArgs, command);
|
||||
} catch (Exception e) {
|
||||
command.printUsage(e.getMessage());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
private void processCommand(final String[] args, final CommandLine commandLine, final Command command) {
|
||||
// visible for testing
|
||||
int processCommand(final String[] args, final Command command) throws ParseException {
|
||||
final CommandLine commandLine = parseCli(command, args);
|
||||
if (commandLine == null) {
|
||||
out.println("Unable to parse command line");
|
||||
return -1;
|
||||
}
|
||||
|
||||
try {
|
||||
if (args.length == 1 && CommandOption.HELP.getLongName().equalsIgnoreCase(args[0])) {
|
||||
command.printUsage(null);
|
||||
@ -247,6 +267,9 @@ public class CommandProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
} catch (Exception e) {
|
||||
// CommandExceptions will wrap things like NiFiClientException, NiFiRegistryException, and IOException,
|
||||
// so for those we don't need to print the usage every time
|
||||
@ -263,8 +286,9 @@ public class CommandProcessor {
|
||||
e.printStackTrace(out);
|
||||
out.println();
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ public class GetRootId extends AbstractNiFiCommand<StringResult> {
|
||||
public StringResult doExecute(final NiFiClient client, final Properties properties)
|
||||
throws NiFiClientException, IOException {
|
||||
final FlowClient flowClient = client.getFlowClient();
|
||||
return new StringResult(flowClient.getRootGroupId());
|
||||
return new StringResult(flowClient.getRootGroupId(), getContext().isInteractive());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ public class PGImport extends AbstractNiFiCommand<StringResult> {
|
||||
|
||||
final ProcessGroupClient pgClient = client.getProcessGroupClient();
|
||||
final ProcessGroupEntity createdEntity = pgClient.createProcessGroup(parentPgId, pgEntity);
|
||||
return new StringResult(createdEntity.getId());
|
||||
return new StringResult(createdEntity.getId(), getContext().isInteractive());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -68,6 +68,6 @@ public class CreateRegistryClient extends AbstractNiFiCommand<StringResult> {
|
||||
clientEntity.setRevision(getInitialRevisionDTO());
|
||||
|
||||
final RegistryClientEntity createdEntity = client.getControllerClient().createRegistryClient(clientEntity);
|
||||
return new StringResult(createdEntity.getId());
|
||||
return new StringResult(createdEntity.getId(), getContext().isInteractive());
|
||||
}
|
||||
}
|
||||
|
@ -17,19 +17,25 @@
|
||||
package org.apache.nifi.toolkit.cli.impl.command.registry;
|
||||
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.registry.bucket.BucketItem;
|
||||
import org.apache.nifi.registry.client.FlowSnapshotClient;
|
||||
import org.apache.nifi.registry.client.NiFiRegistryClient;
|
||||
import org.apache.nifi.registry.client.NiFiRegistryException;
|
||||
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
|
||||
import org.apache.nifi.toolkit.cli.api.ClientFactory;
|
||||
import org.apache.nifi.toolkit.cli.api.CommandException;
|
||||
import org.apache.nifi.toolkit.cli.api.Result;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.AbstractPropertyCommand;
|
||||
import org.apache.nifi.toolkit.cli.impl.session.SessionVariable;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Base class for all NiFi Reg commands.
|
||||
@ -83,4 +89,32 @@ public abstract class AbstractNiFiRegistryCommand<R extends Result> extends Abst
|
||||
return matchingItem.get().getBucketIdentifier();
|
||||
}
|
||||
|
||||
protected List<Integer> getVersions(final NiFiRegistryClient client, final String bucketId, final String flowId)
|
||||
throws NiFiRegistryException, IOException {
|
||||
final FlowSnapshotClient srcSnapshotClient = client.getFlowSnapshotClient();
|
||||
final List<VersionedFlowSnapshotMetadata> srcVersionMetadata = srcSnapshotClient.getSnapshotMetadata(bucketId, flowId);
|
||||
return srcVersionMetadata.stream().map(s -> s.getVersion()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/*
|
||||
* If srcProps was specified then load the properties and create a new client for the source,
|
||||
* but if it wasn't then assume the source is the same registry we already know about
|
||||
*/
|
||||
protected NiFiRegistryClient getSourceClient(final NiFiRegistryClient client, final String srcPropsValue)
|
||||
throws IOException, org.apache.commons.cli.MissingOptionException {
|
||||
final NiFiRegistryClient srcClient;
|
||||
if (!StringUtils.isBlank(srcPropsValue)) {
|
||||
final Properties srcProps = new Properties();
|
||||
try (final InputStream in = new FileInputStream(srcPropsValue)) {
|
||||
srcProps.load(in);
|
||||
}
|
||||
|
||||
final ClientFactory<NiFiRegistryClient> clientFactory = getContext().getNiFiRegistryClientFactory();
|
||||
srcClient = clientFactory.createClient(srcProps);
|
||||
} else {
|
||||
srcClient = client;
|
||||
}
|
||||
return srcClient;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -27,6 +27,8 @@ import org.apache.nifi.toolkit.cli.impl.command.registry.flow.ExportFlowVersion;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.registry.flow.ImportFlowVersion;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.registry.flow.ListFlowVersions;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.registry.flow.ListFlows;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.registry.flow.SyncFlowVersions;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.registry.flow.TransferFlowVersion;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.registry.user.CurrentUser;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -56,6 +58,8 @@ public class NiFiRegistryCommandGroup extends AbstractCommandGroup {
|
||||
commandList.add(new ListFlowVersions());
|
||||
commandList.add(new ExportFlowVersion());
|
||||
commandList.add(new ImportFlowVersion());
|
||||
commandList.add(new SyncFlowVersions());
|
||||
commandList.add(new TransferFlowVersion());
|
||||
return new ArrayList<>(commandList);
|
||||
}
|
||||
}
|
||||
|
@ -62,6 +62,6 @@ public class CreateBucket extends AbstractNiFiRegistryCommand<StringResult> {
|
||||
|
||||
final BucketClient bucketClient = client.getBucketClient();
|
||||
final Bucket createdBucket = bucketClient.create(bucket);
|
||||
return new StringResult(createdBucket.getIdentifier());
|
||||
return new StringResult(createdBucket.getIdentifier(), getContext().isInteractive());
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ import org.apache.nifi.registry.flow.VersionedFlow;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.VoidResult;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.OkResult;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
@ -34,10 +34,10 @@ import java.util.Properties;
|
||||
/**
|
||||
* Deletes a bucket from the given registry.
|
||||
*/
|
||||
public class DeleteBucket extends AbstractNiFiRegistryCommand<VoidResult> {
|
||||
public class DeleteBucket extends AbstractNiFiRegistryCommand<OkResult> {
|
||||
|
||||
public DeleteBucket() {
|
||||
super("delete-bucket", VoidResult.class);
|
||||
super("delete-bucket", OkResult.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -52,7 +52,7 @@ public class DeleteBucket extends AbstractNiFiRegistryCommand<VoidResult> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public VoidResult doExecute(final NiFiRegistryClient client, final Properties properties)
|
||||
public OkResult doExecute(final NiFiRegistryClient client, final Properties properties)
|
||||
throws IOException, NiFiRegistryException, ParseException {
|
||||
|
||||
final String bucketId = getRequiredArg(properties, CommandOption.BUCKET_ID);
|
||||
@ -66,7 +66,7 @@ public class DeleteBucket extends AbstractNiFiRegistryCommand<VoidResult> {
|
||||
} else {
|
||||
final BucketClient bucketClient = client.getBucketClient();
|
||||
bucketClient.delete(bucketId);
|
||||
return VoidResult.getInstance();
|
||||
return new OkResult(getContext().isInteractive());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -64,6 +64,6 @@ public class CreateFlow extends AbstractNiFiRegistryCommand<StringResult> {
|
||||
|
||||
final FlowClient flowClient = client.getFlowClient();
|
||||
final VersionedFlow createdFlow = flowClient.create(flow);
|
||||
return new StringResult(createdFlow.getIdentifier());
|
||||
return new StringResult(createdFlow.getIdentifier(), getContext().isInteractive());
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.VoidResult;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.OkResult;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
@ -34,10 +34,10 @@ import java.util.Properties;
|
||||
/**
|
||||
* Deletes a flow from the given registry.
|
||||
*/
|
||||
public class DeleteFlow extends AbstractNiFiRegistryCommand<VoidResult> {
|
||||
public class DeleteFlow extends AbstractNiFiRegistryCommand<OkResult> {
|
||||
|
||||
public DeleteFlow() {
|
||||
super("delete-flow", VoidResult.class);
|
||||
super("delete-flow", OkResult.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -52,7 +52,7 @@ public class DeleteFlow extends AbstractNiFiRegistryCommand<VoidResult> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public VoidResult doExecute(final NiFiRegistryClient client, final Properties properties)
|
||||
public OkResult doExecute(final NiFiRegistryClient client, final Properties properties)
|
||||
throws IOException, NiFiRegistryException, ParseException {
|
||||
|
||||
final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID);
|
||||
@ -68,7 +68,7 @@ public class DeleteFlow extends AbstractNiFiRegistryCommand<VoidResult> {
|
||||
} else {
|
||||
final FlowClient flowClient = client.getFlowClient();
|
||||
flowClient.delete(bucketId, flowId);
|
||||
return VoidResult.getInstance();
|
||||
return new OkResult(getContext().isInteractive());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -113,7 +113,7 @@ public class ImportFlowVersion extends AbstractNiFiRegistryCommand<StringResult>
|
||||
final VersionedFlowSnapshot createdSnapshot = snapshotClient.create(snapshot);
|
||||
final VersionedFlowSnapshotMetadata createdMetadata = createdSnapshot.getSnapshotMetadata();
|
||||
|
||||
return new StringResult(String.valueOf(createdMetadata.getVersion()));
|
||||
return new StringResult(String.valueOf(createdMetadata.getVersion()), getContext().isInteractive());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,120 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
|
||||
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.nifi.registry.client.NiFiRegistryClient;
|
||||
import org.apache.nifi.registry.client.NiFiRegistryException;
|
||||
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
|
||||
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.OkResult;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.StringResult;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
public class SyncFlowVersions extends AbstractNiFiRegistryCommand<StringResult> {
|
||||
|
||||
public SyncFlowVersions() {
|
||||
super("sync-flow-versions", StringResult.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "Syncs the versions of a flow to another flow, which could be in a different bucket or registry. " +
|
||||
"This command assumes the intent is to maintain the exact version history across the two flows. " +
|
||||
"The list of versions from the source flow will be compared to the destination flow, and any " +
|
||||
"versions not present will be added. If --" + CommandOption.SRC_PROPS.getLongName() + " is not " +
|
||||
"provided then the source registry will be assumed to be the same as the destination registry.";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doInitialize(final Context context) {
|
||||
// source properties
|
||||
addOption(CommandOption.SRC_PROPS.createOption());
|
||||
|
||||
// source flow id
|
||||
addOption(CommandOption.SRC_FLOW_ID.createOption());
|
||||
|
||||
// destination flow id
|
||||
addOption(CommandOption.FLOW_ID.createOption());
|
||||
|
||||
// destination properties will come from standard -p or nifi.reg.props in session
|
||||
}
|
||||
|
||||
@Override
|
||||
public StringResult doExecute(final NiFiRegistryClient client, final Properties properties)
|
||||
throws IOException, NiFiRegistryException, ParseException {
|
||||
|
||||
final String srcPropsValue = getArg(properties, CommandOption.SRC_PROPS);
|
||||
final String srcFlowId = getRequiredArg(properties, CommandOption.SRC_FLOW_ID);
|
||||
final String destFlowId = getRequiredArg(properties, CommandOption.FLOW_ID);
|
||||
|
||||
final NiFiRegistryClient srcClient = getSourceClient(client, srcPropsValue);
|
||||
|
||||
final String srcBucketId = getBucketId(srcClient, srcFlowId);
|
||||
final String destBucketId = getBucketId(client, destFlowId);
|
||||
|
||||
final List<Integer> srcVersions = getVersions(srcClient, srcBucketId, srcFlowId);
|
||||
final List<Integer> destVersions = getVersions(client, destBucketId, destFlowId);
|
||||
|
||||
if (destVersions.size() > srcVersions.size()) {
|
||||
throw new NiFiRegistryException("Destination flow has more versions than source flow");
|
||||
}
|
||||
|
||||
srcVersions.removeAll(destVersions);
|
||||
|
||||
if (srcVersions.isEmpty()) {
|
||||
if (getContext().isInteractive()) {
|
||||
println();
|
||||
println("Source and destination already in sync");
|
||||
}
|
||||
return new OkResult(getContext().isInteractive());
|
||||
}
|
||||
|
||||
// the REST API returns versions in decreasing order, but we want them in increasing order
|
||||
Collections.sort(srcVersions);
|
||||
|
||||
for (final Integer srcVersion : srcVersions) {
|
||||
final VersionedFlowSnapshot srcFlowSnapshot = srcClient.getFlowSnapshotClient().get(srcBucketId, srcFlowId, srcVersion);
|
||||
srcFlowSnapshot.setFlow(null);
|
||||
srcFlowSnapshot.setBucket(null);
|
||||
|
||||
final VersionedFlowSnapshotMetadata destMetadata = new VersionedFlowSnapshotMetadata();
|
||||
destMetadata.setBucketIdentifier(destBucketId);
|
||||
destMetadata.setFlowIdentifier(destFlowId);
|
||||
destMetadata.setVersion(srcVersion);
|
||||
destMetadata.setComments(srcFlowSnapshot.getSnapshotMetadata().getComments());
|
||||
|
||||
srcFlowSnapshot.setSnapshotMetadata(destMetadata);
|
||||
client.getFlowSnapshotClient().create(srcFlowSnapshot);
|
||||
|
||||
if (getContext().isInteractive()) {
|
||||
println();
|
||||
println("Synced version " + srcVersion);
|
||||
}
|
||||
}
|
||||
|
||||
return new OkResult(getContext().isInteractive());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,118 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
|
||||
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.nifi.registry.client.NiFiRegistryClient;
|
||||
import org.apache.nifi.registry.client.NiFiRegistryException;
|
||||
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
|
||||
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.OkResult;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.StringResult;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
public class TransferFlowVersion extends AbstractNiFiRegistryCommand<StringResult> {
|
||||
|
||||
public TransferFlowVersion() {
|
||||
super("transfer-flow-version", StringResult.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "Transfers a version of a flow directly from one flow to another, without needing to export/import. " +
|
||||
"If --" + CommandOption.SRC_PROPS.getLongName() + " is not specified, the source flow is " +
|
||||
"assumed to be in the same registry as the destination flow. " +
|
||||
"If --" + CommandOption.SRC_FLOW_VERSION.getLongName() + " is not specified, then the latest " +
|
||||
"version will be transferred.";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doInitialize(final Context context) {
|
||||
// source properties
|
||||
addOption(CommandOption.SRC_PROPS.createOption());
|
||||
|
||||
// source flow id
|
||||
addOption(CommandOption.SRC_FLOW_ID.createOption());
|
||||
|
||||
// optional version of source flow, otherwise latest
|
||||
addOption(CommandOption.SRC_FLOW_VERSION.createOption());
|
||||
|
||||
// destination flow id
|
||||
addOption(CommandOption.FLOW_ID.createOption());
|
||||
|
||||
// destination properties will come from standard -p or nifi.reg.props in session
|
||||
}
|
||||
|
||||
@Override
|
||||
public StringResult doExecute(final NiFiRegistryClient client, final Properties properties)
|
||||
throws IOException, NiFiRegistryException, ParseException {
|
||||
|
||||
final String srcPropsValue = getArg(properties, CommandOption.SRC_PROPS);
|
||||
final String srcFlowId = getRequiredArg(properties, CommandOption.SRC_FLOW_ID);
|
||||
final Integer srcFlowVersion = getIntArg(properties, CommandOption.SRC_FLOW_VERSION);
|
||||
final String destFlowId = getRequiredArg(properties, CommandOption.FLOW_ID);
|
||||
|
||||
final NiFiRegistryClient srcClient = getSourceClient(client, srcPropsValue);
|
||||
|
||||
// determine the bucket ids of the source and dest flows
|
||||
final String srcBucketId = getBucketId(srcClient, srcFlowId);
|
||||
final String destBucketId = getBucketId(client, destFlowId);
|
||||
|
||||
// get the snapshot of the source flow, either the version specified or the latest
|
||||
final VersionedFlowSnapshot srcSnapshot;
|
||||
if (srcFlowVersion == null) {
|
||||
srcSnapshot = srcClient.getFlowSnapshotClient().getLatest(srcBucketId, srcFlowId);
|
||||
} else {
|
||||
srcSnapshot = srcClient.getFlowSnapshotClient().get(srcBucketId, srcFlowId, srcFlowVersion);
|
||||
}
|
||||
|
||||
// determine the next version number for the destination flow
|
||||
final List<Integer> destVersions = getVersions(client, destBucketId, destFlowId);
|
||||
final Integer destFlowVersion = destVersions.isEmpty() ? 1 : destVersions.get(0) + 1;
|
||||
|
||||
// create the new metadata for the destination snapshot
|
||||
final VersionedFlowSnapshotMetadata destMetadata = new VersionedFlowSnapshotMetadata();
|
||||
destMetadata.setBucketIdentifier(destBucketId);
|
||||
destMetadata.setFlowIdentifier(destFlowId);
|
||||
destMetadata.setVersion(destFlowVersion);
|
||||
destMetadata.setComments(srcSnapshot.getSnapshotMetadata().getComments());
|
||||
|
||||
// update the source snapshot with the destination metadata
|
||||
srcSnapshot.setFlow(null);
|
||||
srcSnapshot.setBucket(null);
|
||||
srcSnapshot.setSnapshotMetadata(destMetadata);
|
||||
|
||||
// create the destination snapshot
|
||||
client.getFlowSnapshotClient().create(srcSnapshot);
|
||||
|
||||
if (getContext().isInteractive()) {
|
||||
println();
|
||||
println("Transferred version " + srcSnapshot.getSnapshotMetadata().getVersion()
|
||||
+ " of source flow to version " + destFlowVersion + " of destination flow");
|
||||
}
|
||||
|
||||
return new OkResult(getContext().isInteractive());
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -50,9 +50,9 @@ public class GetVariable extends AbstractCommand<StringResult> {
|
||||
try {
|
||||
final String value = session.get(args[0]);
|
||||
if (value == null) {
|
||||
return new StringResult("");
|
||||
return new StringResult("", getContext().isInteractive());
|
||||
} else {
|
||||
return new StringResult(value);
|
||||
return new StringResult(value, getContext().isInteractive());
|
||||
}
|
||||
} catch (SessionException se) {
|
||||
throw new CommandException(se.getMessage(), se);
|
||||
|
@ -21,7 +21,9 @@ import org.apache.nifi.registry.bucket.Bucket;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.api.ReferenceResolver;
|
||||
import org.apache.nifi.toolkit.cli.api.Referenceable;
|
||||
import org.apache.nifi.toolkit.cli.api.ResolvedReference;
|
||||
import org.apache.nifi.toolkit.cli.api.ResultType;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.Table;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter;
|
||||
@ -84,13 +86,10 @@ public class BucketsResult extends AbstractWritableResult<List<Bucket>> implemen
|
||||
|
||||
return new ReferenceResolver() {
|
||||
@Override
|
||||
public String resolve(final Integer position) {
|
||||
public ResolvedReference resolve(final CommandOption option, final Integer position) {
|
||||
final Bucket bucket = backRefs.get(position);
|
||||
if (bucket != null) {
|
||||
if (context.isInteractive()) {
|
||||
context.getOutput().printf("Using a positional back-reference for '%s'%n", bucket.getName());
|
||||
}
|
||||
return bucket.getIdentifier();
|
||||
return new ResolvedReference(option, position, bucket.getName(), bucket.getIdentifier());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.impl.result;
|
||||
|
||||
/**
|
||||
* Result for commands that want to print 'OK' for a successful command.
|
||||
*/
|
||||
public class OkResult extends StringResult {
|
||||
|
||||
public static final String OK_VALUE = "OK";
|
||||
|
||||
public OkResult(final boolean isInteractive) {
|
||||
super(OK_VALUE, isInteractive);
|
||||
}
|
||||
|
||||
}
|
@ -20,7 +20,9 @@ import org.apache.commons.lang3.Validate;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.api.ReferenceResolver;
|
||||
import org.apache.nifi.toolkit.cli.api.Referenceable;
|
||||
import org.apache.nifi.toolkit.cli.api.ResolvedReference;
|
||||
import org.apache.nifi.toolkit.cli.api.ResultType;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.Table;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter;
|
||||
@ -90,13 +92,10 @@ public class ProcessGroupsResult extends AbstractWritableResult<List<ProcessGrou
|
||||
|
||||
return new ReferenceResolver() {
|
||||
@Override
|
||||
public String resolve(final Integer position) {
|
||||
public ResolvedReference resolve(final CommandOption option, final Integer position) {
|
||||
final ProcessGroupDTO pg = backRefs.get(position);
|
||||
if (pg != null) {
|
||||
if (context.isInteractive()) {
|
||||
context.getOutput().printf("Using a positional back-reference for '%s'%n", pg.getName());
|
||||
}
|
||||
return pg.getId();
|
||||
return new ResolvedReference(option, position, pg.getName(), pg.getId());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
@ -27,9 +27,11 @@ import java.io.PrintStream;
|
||||
public class StringResult implements WritableResult<String> {
|
||||
|
||||
private final String value;
|
||||
private final boolean isInteractive;
|
||||
|
||||
public StringResult(final String value) {
|
||||
public StringResult(final String value, final boolean isInteractive) {
|
||||
this.value = value;
|
||||
this.isInteractive = isInteractive;
|
||||
Validate.notNull(this.value);
|
||||
}
|
||||
|
||||
@ -40,6 +42,12 @@ public class StringResult implements WritableResult<String> {
|
||||
|
||||
@Override
|
||||
public void write(final PrintStream output) {
|
||||
if (isInteractive) {
|
||||
output.println();
|
||||
}
|
||||
output.println(value);
|
||||
if (isInteractive) {
|
||||
output.println();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,7 +21,9 @@ import org.apache.nifi.registry.flow.VersionedFlow;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.api.ReferenceResolver;
|
||||
import org.apache.nifi.toolkit.cli.api.Referenceable;
|
||||
import org.apache.nifi.toolkit.cli.api.ResolvedReference;
|
||||
import org.apache.nifi.toolkit.cli.api.ResultType;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.Table;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter;
|
||||
@ -84,13 +86,14 @@ public class VersionedFlowsResult extends AbstractWritableResult<List<VersionedF
|
||||
|
||||
return new ReferenceResolver() {
|
||||
@Override
|
||||
public String resolve(final Integer position) {
|
||||
public ResolvedReference resolve(final CommandOption option, final Integer position) {
|
||||
final VersionedFlow versionedFlow = backRefs.get(position);
|
||||
if (versionedFlow != null) {
|
||||
if (context.isInteractive()) {
|
||||
context.getOutput().printf("Using a positional backreference for '%s'%n", versionedFlow.getName());
|
||||
if (option != null && option == CommandOption.BUCKET_ID) {
|
||||
return new ResolvedReference(option, position, versionedFlow.getBucketName(), versionedFlow.getBucketIdentifier());
|
||||
} else {
|
||||
return new ResolvedReference(option, position, versionedFlow.getName(), versionedFlow.getIdentifier());
|
||||
}
|
||||
return versionedFlow.getIdentifier();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.impl.command;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.nifi.toolkit.cli.api.Command;
|
||||
import org.apache.nifi.toolkit.cli.api.CommandException;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class CommandA implements Command<CommandAResult> {
|
||||
|
||||
private final List<String> results;
|
||||
|
||||
private CommandLine cli;
|
||||
|
||||
public CommandA(final List<String> results) {
|
||||
this.results = results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(Context context) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "command-a";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "command-a";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Options getOptions() {
|
||||
Options options = new Options();
|
||||
options.addOption(CommandOption.BUCKET_ID.createOption());
|
||||
options.addOption(CommandOption.FLOW_ID.createOption());
|
||||
return options;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void printUsage(String errorMessage) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public CommandAResult execute(CommandLine cli) throws CommandException {
|
||||
this.cli = cli;
|
||||
return new CommandAResult(results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<CommandAResult> getResultImplType() {
|
||||
return CommandAResult.class;
|
||||
}
|
||||
|
||||
public CommandLine getCli() {
|
||||
return this.cli;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.impl.command;
|
||||
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.api.ReferenceResolver;
|
||||
import org.apache.nifi.toolkit.cli.api.Referenceable;
|
||||
import org.apache.nifi.toolkit.cli.api.ResolvedReference;
|
||||
import org.apache.nifi.toolkit.cli.api.Result;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class CommandAResult implements Result<List<String>>, Referenceable {
|
||||
|
||||
private final List<String> results;
|
||||
|
||||
public CommandAResult(final List<String> results) {
|
||||
this.results = results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getResult() {
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceResolver createReferenceResolver(Context context) {
|
||||
return new ReferenceResolver() {
|
||||
@Override
|
||||
public ResolvedReference resolve(CommandOption option, Integer position) {
|
||||
if (position != null && position <= results.size()) {
|
||||
return new ResolvedReference(option, position, "CommandA", results.get(position - 1));
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.impl.command;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class TestCommandProcessor {
|
||||
|
||||
@Test
|
||||
public void testCommandProcessor() throws ParseException {
|
||||
final List<String> results = new ArrayList<>();
|
||||
results.add("foo1");
|
||||
results.add("foo2");
|
||||
|
||||
final CommandA command = new CommandA(results);
|
||||
|
||||
final Context context = Mockito.mock(Context.class);
|
||||
Mockito.when(context.getOutput()).thenReturn(System.out);
|
||||
|
||||
// run the command once to set the previous results
|
||||
final CommandProcessor processor = new CommandProcessor(Collections.emptyMap(), Collections.emptyMap(), context);
|
||||
processor.processCommand(new String[] {}, command);
|
||||
|
||||
// run it again and &1 should be resolved to foo1
|
||||
processor.processCommand(
|
||||
new String[] {
|
||||
"-" + CommandOption.BUCKET_ID.getShortName(),
|
||||
"&1"
|
||||
} ,
|
||||
command);
|
||||
|
||||
final CommandLine cli1 = command.getCli();
|
||||
Assert.assertEquals("foo1", cli1.getOptionValue(CommandOption.BUCKET_ID.getShortName()));
|
||||
|
||||
// run it again and &2 should be resolved to foo1
|
||||
processor.processCommand(
|
||||
new String[] {
|
||||
"-" + CommandOption.BUCKET_ID.getShortName(),
|
||||
"&2"
|
||||
},
|
||||
command);
|
||||
|
||||
final CommandLine cli2 = command.getCli();
|
||||
Assert.assertEquals("foo2", cli2.getOptionValue(CommandOption.BUCKET_ID.getShortName()));
|
||||
|
||||
// run it again and &1 should be resolved to foo1
|
||||
processor.processCommand(
|
||||
new String[] {
|
||||
"-" + CommandOption.BUCKET_ID.getShortName(),
|
||||
"b1",
|
||||
"-" + CommandOption.FLOW_ID.getShortName(),
|
||||
"&1"
|
||||
},
|
||||
command);
|
||||
|
||||
final CommandLine cli3 = command.getCli();
|
||||
Assert.assertEquals("foo1", cli3.getOptionValue(CommandOption.FLOW_ID.getShortName()));
|
||||
}
|
||||
|
||||
}
|
@ -17,10 +17,14 @@
|
||||
package org.apache.nifi.toolkit.cli.impl.result;
|
||||
|
||||
import org.apache.nifi.registry.flow.VersionedFlow;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.api.ReferenceResolver;
|
||||
import org.apache.nifi.toolkit.cli.api.ResultType;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
@ -34,29 +38,34 @@ public class TestVersionedFlowsResult {
|
||||
|
||||
private ByteArrayOutputStream outputStream;
|
||||
private PrintStream printStream;
|
||||
private List<VersionedFlow> flows;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
this.outputStream = new ByteArrayOutputStream();
|
||||
this.printStream = new PrintStream(outputStream, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteSimpleVersionedFlowsResult() throws IOException {
|
||||
final VersionedFlow f1 = new VersionedFlow();
|
||||
f1.setName("Flow 1");
|
||||
f1.setDescription("This is flow 1");
|
||||
f1.setIdentifier(UUID.fromString("ea752054-22c6-4fc0-b851-967d9a3837cb").toString());
|
||||
f1.setBucketIdentifier("b1");
|
||||
f1.setBucketName("Bucket 1");
|
||||
|
||||
final VersionedFlow f2 = new VersionedFlow();
|
||||
f2.setName("Flow 2");
|
||||
f2.setDescription(null);
|
||||
f2.setIdentifier(UUID.fromString("ddf5f289-7502-46df-9798-4b0457c1816b").toString());
|
||||
f2.setBucketIdentifier("b2");
|
||||
f2.setBucketName("Bucket 2");
|
||||
|
||||
final List<VersionedFlow> flows = new ArrayList<>();
|
||||
this.flows = new ArrayList<>();
|
||||
flows.add(f1);
|
||||
flows.add(f2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteSimpleVersionedFlowsResult() throws IOException {
|
||||
final VersionedFlowsResult result = new VersionedFlowsResult(ResultType.SIMPLE, flows);
|
||||
result.write(printStream);
|
||||
|
||||
@ -73,4 +82,25 @@ public class TestVersionedFlowsResult {
|
||||
Assert.assertEquals(expected, resultOut);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReferenceResolver() {
|
||||
final VersionedFlowsResult result = new VersionedFlowsResult(ResultType.SIMPLE, flows);
|
||||
final ReferenceResolver resolver = result.createReferenceResolver(Mockito.mock(Context.class));
|
||||
|
||||
// should default to flow id when no option is specified
|
||||
Assert.assertEquals("ea752054-22c6-4fc0-b851-967d9a3837cb", resolver.resolve(null, 1).getResolvedValue());
|
||||
Assert.assertEquals("ddf5f289-7502-46df-9798-4b0457c1816b", resolver.resolve(null, 2).getResolvedValue());
|
||||
|
||||
// should use flow id when flow id is specified
|
||||
Assert.assertEquals("ea752054-22c6-4fc0-b851-967d9a3837cb", resolver.resolve(CommandOption.FLOW_ID, 1).getResolvedValue());
|
||||
Assert.assertEquals("ddf5f289-7502-46df-9798-4b0457c1816b", resolver.resolve(CommandOption.FLOW_ID, 2).getResolvedValue());
|
||||
|
||||
// should resolve the bucket id when bucket id option is used
|
||||
Assert.assertEquals("b1", resolver.resolve(CommandOption.BUCKET_ID, 1).getResolvedValue());
|
||||
Assert.assertEquals("b2", resolver.resolve(CommandOption.BUCKET_ID, 2).getResolvedValue());
|
||||
|
||||
// should resolve to null when position doesn't exist
|
||||
Assert.assertEquals(null, resolver.resolve(CommandOption.FLOW_ID, 3));
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user