From 8245bc3f804f701da52a4e64a000b04aff718948 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 21 May 2019 11:03:21 -0400 Subject: [PATCH] NIFI-5922: Ensure that we import any default variable values on flow import --- ...ssControllerServiceInitializationContext.java | 3 ++- .../nifi/stateless/core/StatelessFlow.java | 16 ++++++++++++---- .../core/StatelessValidationContext.java | 3 ++- .../apache/nifi/stateless/runtimes/Program.java | 6 ++++-- .../openwhisk/StatelessNiFiOpenWhiskAction.java | 9 ++++++++- .../apache/nifi/stateless/core/BatchTest.java | 3 ++- 6 files changed, 30 insertions(+), 10 deletions(-) diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java index b0801f01f3..70f4ca7d6d 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java @@ -77,4 +77,5 @@ public class StatelessControllerServiceInitializationContext implements Controll public File getKerberosConfigurationFile() { return null; //this needs to be wired in. - }} + } +} diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java index 2b17376fa6..0418429d24 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java @@ -403,18 +403,26 @@ public class StatelessFlow implements RunnableFlow { args.getAsJsonArray(FAILUREPORTS).forEach(port ->failurePorts.add(port.getAsString())); } + final SSLContext sslContext = getSSLContext(args); + final VersionedFlowSnapshot snapshot = new RegistryUtil(registryurl, sslContext).getFlowByID(bucketID, flowID, flowVersion); + final Map inputVariables = new HashMap<>(); + final VersionedProcessGroup versionedGroup = snapshot.getFlowContents(); + if (versionedGroup != null) { + for (final Map.Entry entry : versionedGroup.getVariables().entrySet()) { + final String variableName = entry.getKey(); + final String variableValue = entry.getValue(); + inputVariables.put(new VariableDescriptor(variableName), variableValue); + } + } if (args.has(VARIABLES)) { final JsonElement variablesElement = args.get(VARIABLES); final JsonObject variablesObject = variablesElement.getAsJsonObject(); variablesObject.entrySet() - .forEach(entry ->inputVariables.put(new VariableDescriptor(entry.getKey()), entry.getValue().getAsString())); + .forEach(entry -> inputVariables.put(new VariableDescriptor(entry.getKey()), entry.getValue().getAsString())); } - final SSLContext sslContext = getSSLContext(args); - - final VersionedFlowSnapshot snapshot = new RegistryUtil(registryurl, sslContext).getFlowByID(bucketID, flowID, flowVersion); final ExtensionManager extensionManager = ExtensionDiscovery.discover(narWorkingDir, systemClassLoader); final StatelessFlow flow = new StatelessFlow(snapshot.getFlowContents(), extensionManager, () -> inputVariables, failurePorts, materializeContent, sslContext); diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessValidationContext.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessValidationContext.java index 62021098a5..6dcb610ce7 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessValidationContext.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessValidationContext.java @@ -40,7 +40,8 @@ public class StatelessValidationContext implements ValidationContext { private final VariableRegistry variableRegistry; private final StatelessProcessContext processContext; - public StatelessValidationContext(final StatelessProcessContext processContext, final StatelessControllerServiceLookup lookup, final StateManager stateManager, final VariableRegistry variableRegistry) { + public StatelessValidationContext(final StatelessProcessContext processContext, final StatelessControllerServiceLookup lookup, final StateManager stateManager, + final VariableRegistry variableRegistry) { this.processContext = processContext; this.lookup = lookup; this.stateManager = stateManager; diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java index e26d0a4ef8..100aad3e19 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java @@ -30,7 +30,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.*; +import java.util.LinkedList; +import java.util.Queue; public class Program { @@ -168,7 +169,8 @@ public class Program { System.out.println(); System.out.println("Notes:"); System.out.println(" 1) The configuration file must be in JSON format. "); - System.out.println(" 2) When providing configurations via JSON, the following attributes must be provided: " + StatelessFlow.REGISTRY + ", " + StatelessFlow.BUCKETID + ", " + StatelessFlow.FLOWID + "."); + System.out.println(" 2) When providing configurations via JSON, the following attributes must be provided: " + StatelessFlow.REGISTRY + ", " + StatelessFlow.BUCKETID + + ", " + StatelessFlow.FLOWID + "."); System.out.println(" All other attributes will be passed to the flow using the variable registry interface"); System.out.println(); } diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java index 849ce135d9..7927d389b0 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java @@ -25,7 +25,14 @@ import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile; import org.apache.nifi.stateless.bootstrap.RunnableFlow; import org.apache.nifi.stateless.core.StatelessFlow; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringWriter; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.LinkedList; diff --git a/nifi-stateless/nifi-stateless-core/src/test/java/org/apache/nifi/stateless/core/BatchTest.java b/nifi-stateless/nifi-stateless-core/src/test/java/org/apache/nifi/stateless/core/BatchTest.java index 286bb2ceb8..15a0f008cf 100644 --- a/nifi-stateless/nifi-stateless-core/src/test/java/org/apache/nifi/stateless/core/BatchTest.java +++ b/nifi-stateless/nifi-stateless-core/src/test/java/org/apache/nifi/stateless/core/BatchTest.java @@ -60,7 +60,8 @@ public class BatchTest { /////////////////////////////////////////// // Build Flow /////////////////////////////////////////// - StatelessProcessorWrapper getFile = new StatelessProcessorWrapper(UUID.randomUUID().toString(), new GetFile(), null, serviceLookup, registry, materializeData, ClassLoader.getSystemClassLoader()); + StatelessProcessorWrapper getFile = new StatelessProcessorWrapper(UUID.randomUUID().toString(), new GetFile(), null, serviceLookup, registry, + materializeData, ClassLoader.getSystemClassLoader()); getFile.setProperty(GetFile.DIRECTORY,"/tmp/nifistateless/input/"); getFile.setProperty(GetFile.FILE_FILTER,"test.txt"); getFile.setProperty(GetFile.KEEP_SOURCE_FILE,"true");