NIFI-5922: Ensure that we import any default variable values on flow import

This commit is contained in:
Mark Payne 2019-05-21 11:03:21 -04:00
parent 650c6aa820
commit 8245bc3f80
6 changed files with 30 additions and 10 deletions

View File

@ -77,4 +77,5 @@ public class StatelessControllerServiceInitializationContext implements Controll
public File getKerberosConfigurationFile() {
return null; //this needs to be wired in.
}}
}
}

View File

@ -403,18 +403,26 @@ public class StatelessFlow implements RunnableFlow {
args.getAsJsonArray(FAILUREPORTS).forEach(port ->failurePorts.add(port.getAsString()));
}
final SSLContext sslContext = getSSLContext(args);
final VersionedFlowSnapshot snapshot = new RegistryUtil(registryurl, sslContext).getFlowByID(bucketID, flowID, flowVersion);
final Map<VariableDescriptor, String> inputVariables = new HashMap<>();
final VersionedProcessGroup versionedGroup = snapshot.getFlowContents();
if (versionedGroup != null) {
for (final Map.Entry<String, String> entry : versionedGroup.getVariables().entrySet()) {
final String variableName = entry.getKey();
final String variableValue = entry.getValue();
inputVariables.put(new VariableDescriptor(variableName), variableValue);
}
}
if (args.has(VARIABLES)) {
final JsonElement variablesElement = args.get(VARIABLES);
final JsonObject variablesObject = variablesElement.getAsJsonObject();
variablesObject.entrySet()
.forEach(entry ->inputVariables.put(new VariableDescriptor(entry.getKey()), entry.getValue().getAsString()));
.forEach(entry -> inputVariables.put(new VariableDescriptor(entry.getKey()), entry.getValue().getAsString()));
}
final SSLContext sslContext = getSSLContext(args);
final VersionedFlowSnapshot snapshot = new RegistryUtil(registryurl, sslContext).getFlowByID(bucketID, flowID, flowVersion);
final ExtensionManager extensionManager = ExtensionDiscovery.discover(narWorkingDir, systemClassLoader);
final StatelessFlow flow = new StatelessFlow(snapshot.getFlowContents(), extensionManager, () -> inputVariables, failurePorts, materializeContent, sslContext);

View File

@ -40,7 +40,8 @@ public class StatelessValidationContext implements ValidationContext {
private final VariableRegistry variableRegistry;
private final StatelessProcessContext processContext;
public StatelessValidationContext(final StatelessProcessContext processContext, final StatelessControllerServiceLookup lookup, final StateManager stateManager, final VariableRegistry variableRegistry) {
public StatelessValidationContext(final StatelessProcessContext processContext, final StatelessControllerServiceLookup lookup, final StateManager stateManager,
final VariableRegistry variableRegistry) {
this.processContext = processContext;
this.lookup = lookup;
this.stateManager = stateManager;

View File

@ -30,7 +30,8 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.LinkedList;
import java.util.Queue;
public class Program {
@ -168,7 +169,8 @@ public class Program {
System.out.println();
System.out.println("Notes:");
System.out.println(" 1) The configuration file must be in JSON format. ");
System.out.println(" 2) When providing configurations via JSON, the following attributes must be provided: " + StatelessFlow.REGISTRY + ", " + StatelessFlow.BUCKETID + ", " + StatelessFlow.FLOWID + ".");
System.out.println(" 2) When providing configurations via JSON, the following attributes must be provided: " + StatelessFlow.REGISTRY + ", " + StatelessFlow.BUCKETID
+ ", " + StatelessFlow.FLOWID + ".");
System.out.println(" All other attributes will be passed to the flow using the variable registry interface");
System.out.println();
}

View File

@ -25,7 +25,14 @@ import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
import org.apache.nifi.stateless.bootstrap.RunnableFlow;
import org.apache.nifi.stateless.core.StatelessFlow;
import java.io.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;

View File

@ -60,7 +60,8 @@ public class BatchTest {
///////////////////////////////////////////
// Build Flow
///////////////////////////////////////////
StatelessProcessorWrapper getFile = new StatelessProcessorWrapper(UUID.randomUUID().toString(), new GetFile(), null, serviceLookup, registry, materializeData, ClassLoader.getSystemClassLoader());
StatelessProcessorWrapper getFile = new StatelessProcessorWrapper(UUID.randomUUID().toString(), new GetFile(), null, serviceLookup, registry,
materializeData, ClassLoader.getSystemClassLoader());
getFile.setProperty(GetFile.DIRECTORY,"/tmp/nifistateless/input/");
getFile.setProperty(GetFile.FILE_FILTER,"test.txt");
getFile.setProperty(GetFile.KEEP_SOURCE_FILE,"true");