NIFI-8644: Introduced a notion of ParameterProviderDefinition

- Refactored stateless to use this when creating a dataflow so that Parameter Provider implementations can be externalized into NARs. Also updated ExtensionDiscoveringManager such that callers are able to provide a new type of class to be discovered (e.g., ParameterProvider) so that the extensions will be automatically discovered
- Put specific command-line overrides as highest precedence for parameter overrides
- Make ParameterOverrideProvider valid by allowing for dynamically added parameters
- Fixed bug in validation logic, added new system tests to verify proper handling of Required and Optional properties
- Addressed review feedback and fixed some bugs. Also added system test to verify Parameter Providers are working as expected

This closes #5113

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2021-06-01 10:00:22 -04:00 committed by exceptionfactory
parent ace27e5f69
commit 6df07df3b2
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
37 changed files with 1151 additions and 181 deletions

View File

@ -21,13 +21,9 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
import org.apache.nifi.stateless.bootstrap.CompositeParameterProvider;
import org.apache.nifi.stateless.bootstrap.EnvironmentVariableParameterProvider;
import org.apache.nifi.stateless.bootstrap.ParameterOverrideProvider;
import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
import org.apache.nifi.stateless.config.ExtensionClientDefinition;
import org.apache.nifi.stateless.config.ParameterOverride;
import org.apache.nifi.stateless.config.ParameterProvider;
import org.apache.nifi.stateless.config.SslContextDefinition;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.apache.nifi.stateless.flow.DataflowDefinition;
@ -187,13 +183,8 @@ public class StatelessKafkaConnectorUtil {
unpackNarLock.unlock();
}
dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties);
final ParameterProvider configurationParameterProvider = new ParameterOverrideProvider(parameterOverrides);
final ParameterProvider environmentVariableProvider = new EnvironmentVariableParameterProvider();
final ParameterProvider compositeParameterProvider = new CompositeParameterProvider(Arrays.asList(configurationParameterProvider, environmentVariableProvider));
return bootstrap.createDataflow(dataflowDefinition, compositeParameterProvider);
dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides);
return bootstrap.createDataflow(dataflowDefinition);
} catch (final Exception e) {
throw new RuntimeException("Failed to bootstrap Stateless NiFi Engine", e);
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.resource.ResourceContext;
import org.apache.nifi.components.resource.StandardResourceContext;
import org.apache.nifi.components.resource.StandardResourceReferenceFactory;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.registry.VariableRegistry;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
public class StandardPropertyContext implements PropertyContext {
private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
private final Map<PropertyDescriptor, String> properties;
private final ConfigurableComponent component;
public StandardPropertyContext(final Map<PropertyDescriptor, String> effectivePropertyValues, final ConfigurableComponent component) {
this.properties = effectivePropertyValues;
this.preparedQueries = new HashMap<>();
this.component = component;
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final PropertyDescriptor desc = entry.getKey();
String value = entry.getValue();
if (value == null) {
value = desc.getDefaultValue();
}
final PreparedQuery pq = Query.prepareWithParametersPreEvaluated(value);
preparedQueries.put(desc, pq);
}
}
@Override
public PropertyValue getProperty(final PropertyDescriptor property) {
final String configuredValue = properties.get(property);
// We need to get the 'canonical representation' of the property descriptor from the component itself,
// since the supplied PropertyDescriptor may not have the proper default value.
final PropertyDescriptor resolvedDescriptor = component.getPropertyDescriptor(property.getName());
final String resolvedValue = (configuredValue == null) ? resolvedDescriptor.getDefaultValue() : configuredValue;
final ResourceContext resourceContext = new StandardResourceContext(new StandardResourceReferenceFactory(), property);
return new StandardPropertyValue(resourceContext, resolvedValue, null, ParameterLookup.EMPTY, preparedQueries.get(property), VariableRegistry.EMPTY_REGISTRY);
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
}

View File

@ -53,6 +53,7 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@ -86,6 +87,10 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
private final Map<String, InstanceClassLoader> instanceClassloaderLookup = new ConcurrentHashMap<>();
public StandardExtensionDiscoveringManager() {
this(Collections.emptyList());
}
public StandardExtensionDiscoveringManager(final Collection<Class<? extends ConfigurableComponent>> additionalExtensionTypes) {
definitionMap.put(Processor.class, new HashSet<>());
definitionMap.put(FlowFilePrioritizer.class, new HashSet<>());
definitionMap.put(ReportingTask.class, new HashSet<>());
@ -102,6 +107,8 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
definitionMap.put(StateProvider.class, new HashSet<>());
definitionMap.put(StatusAnalyticsModel.class, new HashSet<>());
definitionMap.put(NarProvider.class, new HashSet<>());
additionalExtensionTypes.forEach(type -> definitionMap.putIfAbsent(type, new HashSet<>()));
}
@Override
@ -250,10 +257,11 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
protected void initializeTempComponent(final ConfigurableComponent configurableComponent) {
ConfigurableComponentInitializer initializer = null;
try {
initializer = ConfigurableComponentInitializerFactory.createComponentInitializer(this, configurableComponent.getClass());
initializer.initialize(configurableComponent);
final ConfigurableComponentInitializer initializer = ConfigurableComponentInitializerFactory.createComponentInitializer(this, configurableComponent.getClass());
if (initializer != null) {
initializer.initialize(configurableComponent);
}
} catch (final InitializationException e) {
logger.warn(String.format("Unable to initialize component %s due to %s", configurableComponent.getClass().getName(), e.getMessage()));
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.config;
public class ConfigurableExtensionDefinition {
private String name;
private String type;
private String bundleCoordinates;
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
public String getBundleCoordinates() {
return bundleCoordinates;
}
public void setBundleCoordinates(final String bundleCoordinates) {
this.bundleCoordinates = bundleCoordinates;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
}

View File

@ -15,18 +15,19 @@
* limitations under the License.
*/
package org.apache.nifi.stateless.bootstrap;
package org.apache.nifi.stateless.config;
import org.apache.nifi.stateless.config.ParameterProvider;
import java.util.HashMap;
import java.util.Map;
public class EmptyParameterProvider implements ParameterProvider {
@Override
public String getParameterValue(final String contextName, final String parameterName) {
return null;
public class ParameterProviderDefinition extends ConfigurableExtensionDefinition {
private Map<String, String> propertyValues = new HashMap<>();
public Map<String, String> getPropertyValues() {
return propertyValues;
}
@Override
public boolean isParameterDefined(final String contextName, final String parameterName) {
return false;
public void setPropertyValues(final Map<String, String> propertyValues) {
this.propertyValues = propertyValues;
}
}

View File

@ -20,37 +20,10 @@ package org.apache.nifi.stateless.config;
import java.util.HashMap;
import java.util.Map;
public class ReportingTaskDefinition {
private String name;
private String type;
private String bundleCoordinates;
public class ReportingTaskDefinition extends ConfigurableExtensionDefinition {
private String schedulingFrequency;
private Map<String, String> propertyValues = new HashMap<>();
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
public String getBundleCoordinates() {
return bundleCoordinates;
}
public void setBundleCoordinates(final String bundleCoordinates) {
this.bundleCoordinates = bundleCoordinates;
}
public String getSchedulingFrequency() {
return schedulingFrequency;
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.stateless.flow;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterProviderDefinition;
import org.apache.nifi.stateless.config.ReportingTaskDefinition;
import java.util.List;
@ -34,5 +35,7 @@ public interface DataflowDefinition<T> {
List<ReportingTaskDefinition> getReportingTaskDefinitions();
List<ParameterProviderDefinition> getParameterProviderDefinitions();
TransactionThresholds getTransactionThresholds();
}

View File

@ -17,15 +17,19 @@
package org.apache.nifi.stateless.flow;
import org.apache.nifi.stateless.config.ParameterOverride;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public interface DataflowDefinitionParser {
DataflowDefinition<?> parseFlowDefinition(File configurationFile, StatelessEngineConfiguration engineConfiguration) throws StatelessConfigurationException, IOException;
DataflowDefinition<?> parseFlowDefinition(File configurationFile, StatelessEngineConfiguration engineConfiguration, List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException;
DataflowDefinition<?> parseFlowDefinition(Map<String, String> configurationProperties, StatelessEngineConfiguration engineConfiguration) throws StatelessConfigurationException, IOException;
DataflowDefinition<?> parseFlowDefinition(Map<String, String> configurationProperties, StatelessEngineConfiguration engineConfiguration, List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException;
}

View File

@ -17,13 +17,11 @@
package org.apache.nifi.stateless.flow;
import org.apache.nifi.stateless.config.ParameterProvider;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import java.io.IOException;
public interface StatelessDataflowFactory<T> {
StatelessDataflow createDataflow(StatelessEngineConfiguration statelessEngineConfiguration, DataflowDefinition<T> dataflowDefinition,
ParameterProvider parameterProvider) throws IOException, StatelessConfigurationException;
StatelessDataflow createDataflow(StatelessEngineConfiguration statelessEngineConfiguration, DataflowDefinition<T> dataflowDefinition) throws IOException, StatelessConfigurationException;
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.parameter;
import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.context.PropertyContext;
public abstract class AbstractParameterProvider extends AbstractConfigurableComponent implements ParameterProvider {
private ParameterProviderInitializationContext context;
@Override
public final void initialize(final ParameterProviderInitializationContext context) {
this.context = context;
init(context);
}
@Override
public final String getIdentifier() {
return context == null ? "<Unknown ID>" : context.getIdentifier();
}
/**
* Provides PropertyContext to subclasses
*/
protected final PropertyContext getPropertyContext() {
return context;
}
/**
* An empty method that is intended for subclasses to optionally override in order to provide initialization
* @param context the initialization context
*/
protected void init(ParameterProviderInitializationContext context) {
}
}

View File

@ -15,9 +15,13 @@
* limitations under the License.
*/
package org.apache.nifi.stateless.config;
package org.apache.nifi.stateless.parameter;
public interface ParameterProvider {
import org.apache.nifi.components.ConfigurableComponent;
public interface ParameterProvider extends ConfigurableComponent {
void initialize(ParameterProviderInitializationContext context);
/**
* Given a Parameter Context Name and a Parameter Name, returns the value of the parameter

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.parameter;
import org.apache.nifi.context.PropertyContext;
public interface ParameterProviderInitializationContext extends PropertyContext {
String getIdentifier();
}

View File

@ -304,6 +304,10 @@ nifi.stateless.parameters.kafka.Kafka Brokers=kafka-01:9092,kafka-02:9092,kafka-
Note that while Java properties files typically do not allow for spaces in property names, Stateless parses the properties
files in a way that does allow for spaces, so that Parameter names, etc. may allow for spaces.
There are times, however, when we do not want to provide the list of Parameters in the dataflow properties file. We may want to fetch the Parameters from some file or
an external service. For this reason, Stateless supports a notion of a Parameter Provider. A Parameter Provider is an extension point that can be used to retrieve Parameters
from elsewhere. For information on how to configure Parameter Provider, see the [Passing Parameters](#passing-parameters) section below.
When a stateless dataflow is triggered, it can also be important to consider how much data should be allowed to enter the dataflow for a given invocation.
Typically, this consists of a single FlowFile at a time or a single batch of FlowFiles at a time, depending on the source processor. However, some processors may
require additional data in order to perform their tasks. For example, if we have a dataflow whose source processor brings in a single message from a JMS Queue, and
@ -427,28 +431,29 @@ Additionally, there may be sensitive parameters that users prefer not to include
Environment Variables, for example.
These parameters may be passed when running NiFi via the `bin/nifi.sh` script by passing a `-p` argument.
When used, the `-p` argument must be followed by an argument in the format `<context name>:<parameter name>:<parameter value>`
When used, the `-p` argument must be followed by an argument in the format `[<context name>:]<parameter name>=<parameter value>`
For example:
```
bin/nifi.sh stateless -c -p "Kafka Parameter Context:Kafka Topic:Sensor Data" /var/lib/nifi/stateless/config/stateless.properties /var/lib/nifi/stateless/flows/jms-to-kafka.properties
bin/nifi.sh stateless -c -p "Kafka Parameter Context:Kafka Topic=Sensor Data" /var/lib/nifi/stateless/config/stateless.properties /var/lib/nifi/stateless/flows/jms-to-kafka.properties
```
Note that because of the spaces in the Parameter/Context name and the Parameter value, the argument is quoted.
Multiple Parameters may be passed using this syntax:
```
bin/nifi.sh stateless -c -p "Kafka Parameter Context:Kafka Brokers:kafka-01:9092,kafka-02:9092,kafka-03:9092" -p "Kafka Parameter Context:Kafka Topic:Sensor Data" /var/lib/nifi/stateless/config /stateless.properties
bin/nifi.sh stateless -c -p "Kafka Parameter Context:Kafka Brokers=kafka-01:9092,kafka-02:9092,kafka-03:9092" -p "Kafka Parameter Context:Kafka Topic=Sensor Data" /var/lib/nifi/stateless/config /stateless.properties
```
Note also that the Parameter Context Name and the Parameter Name may not include a colon character.
The Parameter Value can include colon characters, as in the example here.
If the name of the Parameter Context contains a colon, it must be escaped using a backslash.
The name of the Parameter Context and the name of the Parameter may not include an equals sign (=).
The Parameter Value can include colon characters, as well as equals, as in the example here.
Often times, though, the Parameter Context name is not particularly important, and we just want to provide a Parameter name.
This can be done by simply leaving off the name of the Parameter Context. For example:
```
bin/nifi.sh stateless -c -p "Kafka Brokers:kafka-01:9092,kafka-02:9092,kafka-03:9092" -p "Kafka Topic:Sensor Data" /var/lib/nifi/stateless/config /stateless.properties
bin/nifi.sh stateless -c -p "Kafka Brokers=kafka-01:9092,kafka-02:9092,kafka-03:9092" -p "Kafka Topic=Sensor Data" /var/lib/nifi/stateless/config /stateless.properties
```
In this case, any Parameter Context that has a name of "Kafka Brokers" will have the parameter resolved to `kafka-01:9092,kafka-02:9092,kafka-03:9092`, regardless of the name
@ -456,4 +461,24 @@ of the Parameter Context.
If a given Parameter is referenced and is not defined using the `-p` syntax, an environment variable may also be used to provide the value. However, environment variables typically are
allowed to contain only letters, numbers, and underscores in their names. As a result, it is important that the Parameters' names also adhere to that same rule, or the environment variable
will not be addressable.
will not be addressable.
At times, none of the built-in capabilities for resolving Parameters are ideal, though. In these situations, we can use a custom Parameter Provider in order to source Parameter values from elsewhere.
To configure a custom Parameter Provider, we must configure it similarly to Reporting Tasks, using a common key to indicate which Parameter Provider the property belongs to.
The following properties are supported:
| Property Name | Description | Example Value |
|---------------|-------------|---------------|
| nifi.stateless.parameter.provider.\<key>.name | The name of the Parameter Provider | My Secret Parameter Provider
| nifi.stateless.parameter.provider.\<key>.type | The type of the Parameter Provider. This may be the fully qualified classname or the simple name, if only a single class exists with the simple name | MySecretParameterProvider |
| nifi.stateless.parameter.provider.\<key>.bundle | The bundle that holds the Parameter Provider. If not specified, the bundle will be automatically identified, if there exists exactly one bundle with the reporting task. However, if no Bundle is specified, none will be downloaded and if more than 1 is already available, the Parameter Provider cannot be created. The format is \<group id>:\<artifact id>:\<version> | org.apache.nifi:nifi-standard-nar:1.14.0 |
| nifi.stateless.parameter.provider.\<key>.properties.\<property name> | One or more Parameter Provider properties may be configured using this syntax | Any valid value for the corresponding property |
An example Parameter Provider might be configured as follows:
```
nifi.stateless.parameter.provider.Props File Provider.name=My Custom Properties File Parameter Provider
nifi.stateless.parameter.provider.Props File Provider.type=com.myorg.nifi.parameters.custom.MyCustomPropertiesFileParameterProvider
nifi.stateless.parameter.provider.Props File Provider.bundle=com.myorg:nifi-custom-parameter-provider-nar:0.0.1
nifi.stateless.parameter.provider.Props File Provider.properties.Filename=/tmp/parameters.properties
```

View File

@ -20,7 +20,6 @@ package org.apache.nifi.stateless.bootstrap;
import org.apache.nifi.stateless.config.ParameterOverride;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
@ -36,7 +35,7 @@ public class BootstrapConfiguration {
private static final String DEFAULT_ENGINE_CONFIG_FILE = "./conf/stateless.properties";
private static final String DEFAULT_FLOW_CONFIG_FILE = "./conf/env-flow-config.properties";
private static final Pattern PARAMETER_OVERRIDE_PATTERN = Pattern.compile("(?<!\\\\):");
private static final Pattern PARAMETER_OVERRIDE_PATTERN = Pattern.compile( "(?<!\\\\):" );
private static final String PARAMETER_OVERRIDE_FLAG = "-p";
private static final String RUN_CONTINUOUS_SHORT_FLAG = "-c";
private static final String RUN_CONTINUOUS_LONG_FLAG = "--continuous";
@ -60,16 +59,16 @@ public class BootstrapConfiguration {
System.out.println();
System.out.println();
System.out.println("Options:");
System.out.println(PARAMETER_OVERRIDE_FLAG + " <context name>:<parameter name>:<parameter value>");
System.out.println(PARAMETER_OVERRIDE_FLAG + " <context name>:<parameter name>=<parameter value>");
System.out.println(" Specifies a parameter value to use. If the parameter is present in the provided flow configuration file, the value provided here will take precedence.");
System.out.println(" For example, to specify that the 'Foo' parameter of the Parameter Context with name 'bar' should have a value of 'BAZ', use:");
System.out.println(" -p bar:Foo:BAZ");
System.out.println(" -p bar:Foo=BAZ");
System.out.println();
System.out.println(" Multiple Parameters may be specified in this way. For example:");
System.out.println(" -p bar:Foo:BAZ -p \"My Context:My Parameter:My Value\"");
System.out.println(" -p bar:Foo=BAZ -p \"My Context:My Parameter=My Value\"");
System.out.println();
System.out.println(" If a Parameter name or value or Parameter Context name has a colon in it, it may be escaped using the \\ character:");
System.out.println(" -p \"My Context:My Parameter:Use ratio of 1\\:1");
System.out.println(" If a Parameter Context name has a colon in it, it may be escaped using the \\ character. Parameter names and values do not need to be escaped:");
System.out.println(" -p \"My\\:Context:My:Parameter=Use ratio of 1:1");
System.out.println();
System.out.println(RUN_CONTINUOUS_SHORT_FLAG);
System.out.println(RUN_CONTINUOUS_LONG_FLAG);
@ -113,7 +112,7 @@ public class BootstrapConfiguration {
return runContinuous;
}
static BootstrapConfiguration fromCommandLineArgs(final String[] args) throws FileNotFoundException {
static BootstrapConfiguration fromCommandLineArgs(final String[] args) {
// Create configuration and parse arguments.
final BootstrapConfiguration configuration = new BootstrapConfiguration();
configuration.parseArguments(args);
@ -185,32 +184,34 @@ public class BootstrapConfiguration {
}
// Validate the Flow Configuration File
if (flowConfigFilename == null) {
throw new IllegalArgumentException(String.format("No Flow Conf Configuration File was specified - please specify a filename using the %s or %s command-line argument " +
"or by specifying the %s Environment Variable",
FLOW_CONFIGURATION_FILE_LONG_FLAG, FLOW_CONFIGURATION_FILE_SHORT_FLAG, FLOW_CONFIGURATION_FILE_ENV_VAR));
}
flowDefinitionFile = new File(flowConfigFilename);
if (!flowDefinitionFile.exists()) {
throw new IllegalArgumentException(String.format("Cannot find Flow Configuration File %s - please ensure that the file exists and appropriate permissions are in place for allowing " +
"access to the file, or otherwise specify a different filename using the %s or %s command-line argument or by specifying the %s Environment Variable",
flowDefinitionFile.getAbsolutePath(), FLOW_CONFIGURATION_FILE_LONG_FLAG, FLOW_CONFIGURATION_FILE_SHORT_FLAG, FLOW_CONFIGURATION_FILE_ENV_VAR));
}
}
private ParameterOverride parseOverride(final String argument) {
final String[] splits = argument.split(PARAMETER_OVERRIDE_PATTERN.pattern(), 3);
ParameterOverride parseOverride(final String argument) {
final String[] nameAndValueSplits = argument.split("=", 2);
if (nameAndValueSplits.length == 1) {
throw new IllegalArgumentException("Invalid parameter: argument has no equals sign: " + argument);
}
if (splits.length == 2) {
final String parameterName = splits[0].replace("\\:", ":");
final String parameterValue = splits[1].replace("\\:", ":");
return new ParameterOverride(parameterName, parameterValue);
} else if (splits.length == 3) {
final String contextAndParameterName = nameAndValueSplits[0];
if (contextAndParameterName.trim().isEmpty()) {
throw new IllegalArgumentException("Invalid parameter: argument has no parameter name: " + argument);
}
final String parameterValue = nameAndValueSplits[1];
final String[] splits = contextAndParameterName.split(PARAMETER_OVERRIDE_PATTERN.pattern(), 2);
if (splits.length == 1) {
return new ParameterOverride(contextAndParameterName, parameterValue);
} else if (splits.length == 2) {
final String contextName = splits[0].replace("\\:", ":");
final String parameterName = splits[1].replace("\\:", ":");
final String parameterValue = splits[2].replace("\\:", ":");
final String parameterName = splits[1];
return new ParameterOverride(contextName, parameterName, parameterValue);
}

View File

@ -18,7 +18,6 @@
package org.apache.nifi.stateless.bootstrap;
import org.apache.nifi.stateless.config.ParameterOverride;
import org.apache.nifi.stateless.config.ParameterProvider;
import org.apache.nifi.stateless.config.PropertiesFileEngineConfigurationParser;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
@ -32,9 +31,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class RunStatelessFlow {
private static final Logger logger = LoggerFactory.getLogger(RunStatelessFlow.class);
@ -82,19 +79,9 @@ public class RunStatelessFlow {
}
private static void triggerOnce(final StatelessDataflow dataflow) throws InterruptedException {
final long triggerStart = System.nanoTime();
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
result.acknowledge();
final long triggerNanos = System.nanoTime() - triggerStart;
final long triggerMillis = TimeUnit.NANOSECONDS.toMillis(triggerNanos);
if (triggerMillis > 0) {
logger.info("Ran dataflow in {} millis", triggerMillis);
} else {
logger.info("Ran dataflow in {} nanoseconds", triggerNanos);
}
}
public static StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final File flowDefinitionFile, final List<ParameterOverride> parameterOverrides)
@ -102,13 +89,9 @@ public class RunStatelessFlow {
final long initializeStart = System.currentTimeMillis();
final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(engineConfiguration);
final DataflowDefinition<?> dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile);
final DataflowDefinition<?> dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile, parameterOverrides);
final ParameterProvider explicitParameterProvider = new ParameterOverrideProvider(parameterOverrides);
final ParameterProvider environmentParameterProvider = new EnvironmentVariableParameterProvider();
final ParameterProvider compositeProvider = new CompositeParameterProvider(Arrays.asList(explicitParameterProvider, environmentParameterProvider));
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition, compositeProvider);
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition);
dataflow.initialize();
final StatelessDataflowValidation validation = dataflow.performValidation();

View File

@ -22,7 +22,7 @@ import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarUnpacker;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.stateless.config.ParameterProvider;
import org.apache.nifi.stateless.config.ParameterOverride;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.engine.NarUnpackLock;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
@ -60,22 +60,24 @@ public class StatelessBootstrap {
this.engineConfiguration = engineConfiguration;
}
public <T> StatelessDataflow createDataflow(final DataflowDefinition<T> dataflowDefinition, final ParameterProvider parameterProvider)
public <T> StatelessDataflow createDataflow(final DataflowDefinition<T> dataflowDefinition)
throws IOException, StatelessConfigurationException {
final StatelessDataflowFactory<T> dataflowFactory = getSingleInstance(statelessClassLoader, StatelessDataflowFactory.class);
final StatelessDataflow dataflow = dataflowFactory.createDataflow(engineConfiguration, dataflowDefinition, parameterProvider);
final StatelessDataflow dataflow = dataflowFactory.createDataflow(engineConfiguration, dataflowDefinition);
return dataflow;
}
public DataflowDefinition<?> parseDataflowDefinition(final File flowDefinitionFile) throws StatelessConfigurationException, IOException {
public DataflowDefinition<?> parseDataflowDefinition(final File flowDefinitionFile, final List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException {
final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(statelessClassLoader, DataflowDefinitionParser.class);
final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration);
final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration, parameterOverrides);
return dataflowDefinition;
}
public DataflowDefinition<?> parseDataflowDefinition(final Map<String, String> flowDefinitionProperties) throws StatelessConfigurationException, IOException {
public DataflowDefinition<?> parseDataflowDefinition(final Map<String, String> flowDefinitionProperties, final List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException {
final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(statelessClassLoader, DataflowDefinitionParser.class);
final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionProperties, engineConfiguration);
final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionProperties, engineConfiguration, parameterOverrides);
return dataflowDefinition;
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.bootstrap;
import org.apache.nifi.stateless.config.ParameterOverride;
import org.junit.Assert;
import org.junit.Test;
import static junit.framework.TestCase.assertEquals;
public class TestBootstrapConfiguration {
private final String engineConfigPropertiesFilename = "src/test/resources/nifi-stateless.properties";
@Test
public void testParseParameterOverride() {
final BootstrapConfiguration configuration = BootstrapConfiguration.fromCommandLineArgs(new String[] {"-e", engineConfigPropertiesFilename, "-f", engineConfigPropertiesFilename});
testOverride(configuration, "a:b=c", "a", "b", "c"); // simple case, context name, param name, param value, no special chars
testOverride(configuration, "a=b", null, "a", "b"); // test no context name
testOverride(configuration, "a\\:b:c=d", "a:b", "c", "d"); // test escaped colon in context name
testOverride(configuration, "a:b:c=d", "a", "b:c", "d"); // test colon in param name
testOverride(configuration, "a=b:c", null, "a", "b:c"); // test colon in param value
testOverride(configuration, "a=b=c", null, "a", "b=c"); // test equals in param value
testOverride(configuration, "a b:c d=e f g", "a b", "c d", "e f g"); // test spaces
// Any input that doesn't contain an equals should fail
testParseFailure(configuration, "a");
testParseFailure(configuration, "a:b");
testParseFailure(configuration, "a:b:c");
testParseFailure(configuration, "=c");
}
private void testOverride(final BootstrapConfiguration configuration, final String argument, final String contextName, final String parameterName, final String parameterValue) {
final ParameterOverride override = configuration.parseOverride(argument);
assertEquals(contextName, override.getContextName());
assertEquals(parameterName, override.getParameterName());
assertEquals(parameterValue, override.getParameterValue());
}
private void testParseFailure(final BootstrapConfiguration configuration, final String argument) {
try {
configuration.parseOverride(argument);
Assert.fail("Expected an IllegalArgumentException");
} catch (final IllegalArgumentException expected) {
}
}
}

View File

@ -20,11 +20,13 @@ import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
import org.apache.nifi.stateless.parameter.ParameterProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -43,7 +45,7 @@ public class ExtensionDiscovery {
final Set<Bundle> narBundles = narClassLoaders.getBundles();
final long discoveryStart = System.nanoTime();
final StandardExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager();
final StandardExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager(Collections.singleton(ParameterProvider.class));
extensionManager.discoverExtensions(narBundles);
final long discoveryMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - discoveryStart);

View File

@ -36,6 +36,8 @@ import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.DataflowDefinitionParser;
import org.apache.nifi.stateless.flow.StandardDataflowDefinition;
import org.apache.nifi.stateless.flow.TransactionThresholds;
import org.apache.nifi.stateless.parameter.EnvironmentVariableParameterProvider;
import org.apache.nifi.stateless.parameter.ParameterOverrideProvider;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,6 +54,7 @@ import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@ -71,11 +74,14 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar
// After the name of the parameter context, it may or may not have a ".<parameter name>" component, then an equals (=) and a value.
private static final Pattern PARAMETER_CONTEXT_PATTERN = Pattern.compile("\\Qnifi.stateless.parameters.\\E(.*?)(\\..*)?");
private static final Pattern REPORTING_TASK_PATTERN = Pattern.compile("\\Qnifi.stateless.reporting.task.\\E(.*?)\\.(.*)");
private static final String PARAMETER_PROVIDER_PREFIX = "nifi.stateless.parameter.provider.";
private static final Pattern PARAMETER_PROVIDER_PATTERN = Pattern.compile("\\Q" + PARAMETER_PROVIDER_PREFIX + "\\E(.*?)\\.(.*)");
// Any property value of the form env{...} can be used to reference an environment variable. For example, env{ABC} references the ABC environment variable.
private static final Pattern ENV_VARIABLE_PATTERN = Pattern.compile("env\\{(.*)}");
// Property names/keys
private static final String PROPERTIES_PREFIX = "properties.";
private static final String FAILURE_PORTS_KEY = "nifi.stateless.failure.port.names";
private static final String REGISTRY_URL_KEY = "nifi.stateless.registry.url";
private static final String BUCKET_ID_KEY = "nifi.stateless.flow.bucketId";
@ -91,14 +97,14 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar
private static final String TRANSACTION_THRESHOLD_TIME = "nifi.stateless.transaction.thresholds.time";
public DataflowDefinition<VersionedFlowSnapshot> parseFlowDefinition(final File propertiesFile, final StatelessEngineConfiguration engineConfig)
public DataflowDefinition<VersionedFlowSnapshot> parseFlowDefinition(final File propertiesFile, final StatelessEngineConfiguration engineConfig, final List<ParameterOverride> parameterOverrides)
throws IOException, StatelessConfigurationException {
final Map<String, String> properties = readPropertyValues(propertiesFile);
return parseFlowDefinition(properties, engineConfig);
return parseFlowDefinition(properties, engineConfig, parameterOverrides);
}
public DataflowDefinition<VersionedFlowSnapshot> parseFlowDefinition(final Map<String, String> properties, final StatelessEngineConfiguration engineConfig)
throws IOException, StatelessConfigurationException {
public DataflowDefinition<VersionedFlowSnapshot> parseFlowDefinition(final Map<String, String> properties, final StatelessEngineConfiguration engineConfig,
final List<ParameterOverride> parameterOverrides) throws IOException, StatelessConfigurationException {
// A common problem is users accidentally including whitespace at the beginning or end of property values.
// We can't just blindly trim the white space because it may be relevant. For example, there may be a Parameter
@ -110,6 +116,7 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar
final VersionedFlowSnapshot flowSnapshot = fetchVersionedFlowSnapshot(properties, engineConfig.getSslContext());
final List<ParameterContextDefinition> parameterContextDefinitions = getParameterContexts(properties);
final List<ReportingTaskDefinition> reportingTaskDefinitions = getReportingTasks(properties);
final List<ParameterProviderDefinition> parameterProviderDefinitions = getParameterProviders(properties, parameterOverrides);
final TransactionThresholds transactionThresholds = getTransactionThresholds(properties);
final String rootGroupName = flowSnapshot.getFlowContents().getName();
@ -121,6 +128,7 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar
.failurePortNames(failurePortNames)
.parameterContexts(parameterContextDefinitions)
.reportingTasks(reportingTaskDefinitions)
.parameterProviders(parameterProviderDefinitions)
.transactionThresholds(transactionThresholds)
.build();
}
@ -174,6 +182,97 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar
return new ArrayList<>(reportingTaskDefinitions.values());
}
private List<ParameterProviderDefinition> getParameterProviders(final Map<String, String> properties, final List<ParameterOverride> parameterOverrides) {
final Map<String, ParameterProviderDefinition> parameterProviderDefinitions = new LinkedHashMap<>();
parameterProviderDefinitions.put("Default Parameter Override Provider", createParameterOverrideProvider(parameterOverrides));
parameterProviderDefinitions.put("Default Environment Variable Provider", createEnvironmentVariableProvider());
for (final String propertyName : properties.keySet()) {
final Matcher matcher = PARAMETER_PROVIDER_PATTERN.matcher(propertyName);
if (!matcher.matches()) {
continue;
}
// For a property name like:
// nifi.stateless.parameter.provider.abc.name=hello
// We consider 'abc' the <parameter provider key> and 'name' the <relative property name>
final String parameterProviderKey = matcher.group(1);
final ParameterProviderDefinition definition = parameterProviderDefinitions.computeIfAbsent(parameterProviderKey, key -> new ParameterProviderDefinition());
definition.setName(parameterProviderKey);
final String relativePropertyName = matcher.group(2);
final String propertyValue = properties.get(propertyName);
if (relativePropertyName.startsWith(PROPERTIES_PREFIX)) {
if (relativePropertyName.length() <= PROPERTIES_PREFIX.length()) {
logger.warn("Encountered unexpected property <" + propertyName + "> in flow definition. This property will be ignored.");
continue;
}
final String providerPropertyName = relativePropertyName.substring(PROPERTIES_PREFIX.length());
definition.getPropertyValues().put(providerPropertyName, propertyValue);
} else {
switch (relativePropertyName) {
case "name":
definition.setName(propertyValue);
break;
case "type":
definition.setType(propertyValue);
break;
case "bundle":
definition.setBundleCoordinates(propertyValue);
break;
default:
logger.warn("Encountered unexpected property <" + propertyName + "> in flow definition. This property will be ignored.");
break;
}
}
}
// Validate that all providers have the required necessary information
for (final Map.Entry<String, ParameterProviderDefinition> entry : parameterProviderDefinitions.entrySet()) {
final String providerKey = entry.getKey();
final ParameterProviderDefinition definition = entry.getValue();
if (definition.getName() == null) {
logger.warn("Parameter Provider identified in Properties with key <" + providerKey + "> was not provided a name. Will default name to <" + providerKey + ">");
definition.setName(providerKey);
}
if (definition.getType() == null) {
throw new IllegalArgumentException("Parameter Provider <" + definition.getName() + "> does not have a Type set. This must be set by adding a property named " +
PARAMETER_PROVIDER_PREFIX + providerKey + ".type");
}
}
return new ArrayList<>(parameterProviderDefinitions.values());
}
private ParameterProviderDefinition createEnvironmentVariableProvider() {
final ParameterProviderDefinition overrideProvider = new ParameterProviderDefinition();
overrideProvider.setType(EnvironmentVariableParameterProvider.class.getName());
overrideProvider.setName("Environment Variable Parameter Provider");
overrideProvider.setPropertyValues(Collections.emptyMap());
return overrideProvider;
}
private ParameterProviderDefinition createParameterOverrideProvider(final List<ParameterOverride> parameterOverrides) {
final ParameterProviderDefinition overrideProvider = new ParameterProviderDefinition();
overrideProvider.setType(ParameterOverrideProvider.class.getName());
overrideProvider.setName("Parameter Override Provider");
final Map<String, String> propertyValues = new LinkedHashMap<>();
for (final ParameterOverride override : parameterOverrides) {
final String contextName = override.getContextName();
final String parameterName = override.getParameterName();
final String propertyName = contextName == null ? parameterName : contextName + ":" + parameterName;
propertyValues.put(propertyName, override.getParameterValue());
}
overrideProvider.setPropertyValues(propertyValues);
return overrideProvider;
}
private List<ParameterContextDefinition> getParameterContexts(final Map<String, String> properties) {
final Map<String, ParameterContextDefinition> contextDefinitions = new LinkedHashMap<>();

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.engine;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.service.StandardPropertyContext;
import org.apache.nifi.stateless.parameter.ParameterProvider;
import org.apache.nifi.stateless.parameter.ParameterProviderInitializationContext;
import java.util.LinkedHashMap;
import java.util.Map;
public class StandardParameterProviderInitializationContext extends StandardPropertyContext implements ParameterProviderInitializationContext {
private final String identifier;
public StandardParameterProviderInitializationContext(final ParameterProvider parameterProvider, final Map<String, String> propertyValues, final String identifier) {
super(createPropertyMap(parameterProvider, propertyValues), parameterProvider);
this.identifier = identifier;
}
private static Map<PropertyDescriptor, String> createPropertyMap(final ParameterProvider provider, final Map<String, String> propertyValues) {
final Map<PropertyDescriptor, String> propertyMap = new LinkedHashMap<>();
for (final Map.Entry<String, String> entry : propertyValues.entrySet()) {
final PropertyDescriptor descriptor = provider.getPropertyDescriptor(entry.getKey());
propertyMap.put(descriptor, entry.getValue());
}
return propertyMap;
}
@Override
public String getIdentifier() {
return identifier;
}
}

View File

@ -21,12 +21,16 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.components.validation.StandardValidationTrigger;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.flow.FlowManager;
@ -41,9 +45,12 @@ import org.apache.nifi.extensions.ExtensionRepository;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionDefinition;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.InstanceClassLoader;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.processor.StandardValidationContext;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
@ -53,21 +60,27 @@ import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stateless.config.ConfigurableExtensionDefinition;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterDefinition;
import org.apache.nifi.stateless.config.ParameterProvider;
import org.apache.nifi.stateless.config.ParameterProviderDefinition;
import org.apache.nifi.stateless.config.ReportingTaskDefinition;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.StandardStatelessFlow;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.parameter.CompositeParameterProvider;
import org.apache.nifi.stateless.parameter.ParameterProvider;
import org.apache.nifi.stateless.parameter.ParameterProviderInitializationContext;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -138,7 +151,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
}
@Override
public StatelessDataflow createFlow(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition, final ParameterProvider parameterProvider) {
public StatelessDataflow createFlow(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
if (!this.initialized) {
throw new IllegalStateException("Cannot create Flow without first initializing Stateless Engine");
}
@ -159,6 +172,8 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
childGroup.updateFlow(dataflowDefinition.getFlowSnapshot(), "stateless-component-id-seed", false, true, true);
final ParameterProvider parameterProvider = createParameterProvider(dataflowDefinition);
// Map existing parameter contexts by name
final Set<ParameterContext> parameterContexts = flowManager.getParameterContextManager().getParameterContexts();
final Map<String, ParameterContext> parameterContextMap = parameterContexts.stream()
@ -182,6 +197,98 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return dataflow;
}
private ParameterProvider createParameterProvider(final DataflowDefinition<?> dataflowDefinition) {
// Create a Provider for each definition
final List<ParameterProvider> providers = new ArrayList<>();
for (final ParameterProviderDefinition definition : dataflowDefinition.getParameterProviderDefinitions()) {
providers.add(createParameterProvider(definition));
}
// Create a Composite Parameter Provider that wraps all of the others.
final CompositeParameterProvider provider = new CompositeParameterProvider(providers);
final ParameterProviderInitializationContext initializationContext = new StandardParameterProviderInitializationContext(provider, Collections.emptyMap(), UUID.randomUUID().toString());
provider.initialize(initializationContext);
return provider;
}
private ParameterProvider createParameterProvider(final ParameterProviderDefinition definition) {
final BundleCoordinate bundleCoordinate = determineBundleCoordinate(definition, "Parameter Provider");
final Bundle bundle = extensionManager.getBundle(bundleCoordinate);
if (bundle == null) {
throw new IllegalStateException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
}
final String providerType = definition.getType();
final String providerId = UUID.randomUUID().toString();
final InstanceClassLoader classLoader = extensionManager.createInstanceClassLoader(providerType, providerId, bundle, Collections.emptySet());
try {
final Class<?> rawClass = Class.forName(providerType, true, classLoader);
Thread.currentThread().setContextClassLoader(classLoader);
final ParameterProvider parameterProvider = (ParameterProvider) rawClass.newInstance();
// Initialize the provider
final Map<String, String> properties = resolveProperties(definition.getPropertyValues(), parameterProvider, parameterProvider.getPropertyDescriptors());
final ParameterProviderInitializationContext initializationContext = new StandardParameterProviderInitializationContext(parameterProvider, properties, providerId);
parameterProvider.initialize(initializationContext);
// Ensure that the Parameter Provider is valid.
final List<ValidationResult> validationResults = validate(parameterProvider, properties, providerId);
if (!validationResults.isEmpty()) {
throw new IllegalStateException("Parameter Provider with name <" + definition.getName() + "> is not valid: " + validationResults);
}
return parameterProvider;
} catch (final Exception e) {
throw new IllegalStateException("Could not create Parameter Provider " + definition.getName() + " of type " + definition.getType(), e);
}
}
private List<ValidationResult> validate(final ConfigurableComponent component, final Map<String, String> properties, final String componentId) {
final Map<PropertyDescriptor, PropertyConfiguration> explicitlyConfiguredPropertyMap = new HashMap<>();
for (final Map.Entry<String, String> property : properties.entrySet()) {
final String propertyName = property.getKey();
final String propertyValue = property.getValue();
final PropertyDescriptor descriptor = component.getPropertyDescriptor(propertyName);
final PropertyConfiguration propertyConfiguration = new PropertyConfiguration(propertyValue, null, Collections.emptyList());
explicitlyConfiguredPropertyMap.put(descriptor, propertyConfiguration);
}
final Map<PropertyDescriptor, PropertyConfiguration> fullPropertyMap = buildConfiguredAndDefaultPropertyMap(component, explicitlyConfiguredPropertyMap);
final ValidationContext validationContext = new StandardValidationContext(controllerServiceProvider, fullPropertyMap,
null, null, componentId, VariableRegistry.EMPTY_REGISTRY, null);
final Collection<ValidationResult> validationResults = component.validate(validationContext);
return validationResults.stream()
.filter(validationResult -> !validationResult.isValid())
.collect(Collectors.toList());
}
public Map<PropertyDescriptor, PropertyConfiguration> buildConfiguredAndDefaultPropertyMap(final ConfigurableComponent component, final Map<PropertyDescriptor, PropertyConfiguration> properties) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, component.getClass(), component.getIdentifier())) {
final List<PropertyDescriptor> supported = component.getPropertyDescriptors();
if (supported == null || supported.isEmpty()) {
return Collections.unmodifiableMap(properties);
} else {
final Map<PropertyDescriptor, PropertyConfiguration> props = new LinkedHashMap<>();
for (final PropertyDescriptor descriptor : supported) {
props.put(descriptor, null);
}
props.putAll(properties);
return props;
}
}
}
private void loadNecessaryExtensions(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
final VersionedProcessGroup group = dataflowDefinition.getFlowSnapshot().getFlowContents();
final Set<BundleCoordinate> requiredBundles = gatherRequiredBundles(group);
@ -195,6 +302,15 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
requiredBundles.add(coordinate);
}
for (final ParameterProviderDefinition parameterProviderDefinition : dataflowDefinition.getParameterProviderDefinitions()) {
final BundleCoordinate coordinate = parseBundleCoordinate(parameterProviderDefinition);
if (coordinate == null) {
continue;
}
requiredBundles.add(coordinate);
}
final ExecutorService executor = new FlowEngine(CONCURRENT_EXTENSION_DOWNLOADS, "Download Extensions", true);
final Future<Set<Bundle>> future = extensionRepository.fetch(requiredBundles, executor, CONCURRENT_EXTENSION_DOWNLOADS);
executor.shutdown();
@ -243,18 +359,27 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
}
private ReportingTaskNode createReportingTask(final ReportingTaskDefinition taskDefinition) {
final BundleCoordinate bundleCoordinate = determineBundleCoordinate(taskDefinition);
final BundleCoordinate bundleCoordinate = determineBundleCoordinate(taskDefinition, "Reporting Task");
final ReportingTaskNode taskNode = flowManager.createReportingTask(taskDefinition.getType(), UUID.randomUUID().toString(), bundleCoordinate, Collections.emptySet(), true, true);
taskNode.setProperties(resolveProperties(taskDefinition.getPropertyValues(), taskNode));
final Map<String, String> properties = resolveProperties(taskDefinition.getPropertyValues(), taskNode.getComponent(), taskNode.getProperties().keySet());
taskNode.setProperties(properties);
taskNode.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN);
taskNode.setSchedulingPeriod(taskDefinition.getSchedulingFrequency());
// Ensure that the Parameter Provider is valid.
final List<ValidationResult> validationResults = validate(taskNode.getComponent(), properties, taskNode.getIdentifier());
if (!validationResults.isEmpty()) {
throw new IllegalStateException("Reporting Task with name <" + taskNode.getName() + "> is not valid: " + validationResults);
}
return taskNode;
}
private Map<String, String> resolveProperties(final Map<String, String> configured, final ReportingTaskNode taskNode) {
private Map<String, String> resolveProperties(final Map<String, String> configured, final ConfigurableComponent component, final Collection<PropertyDescriptor> componentDescriptors) {
// Map property display name to actual names.
final Map<String, String> displayNameToActualName = new HashMap<>();
for (final PropertyDescriptor descriptor : taskNode.getProperties().keySet()) {
for (final PropertyDescriptor descriptor : componentDescriptors) {
displayNameToActualName.put(descriptor.getDisplayName(), descriptor.getName());
}
@ -271,13 +396,13 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
String resolvedValue = configuredValue;
if (actual != null) {
// This is a 'known' / non-dynamic property
final PropertyDescriptor descriptor = taskNode.getPropertyDescriptor(actual);
final PropertyDescriptor descriptor = component.getPropertyDescriptor(actual);
final List<AllowableValue> allowableValues = descriptor.getAllowableValues();
if (allowableValues != null && !allowableValues.isEmpty()) {
for (final AllowableValue allowableValue : allowableValues) {
if (allowableValue.getDisplayName().equalsIgnoreCase(configuredValue)) {
resolvedValue = allowableValue.getValue();
logger.debug("Resolving property value of {} for {} of {} to {}", configuredValue, configuredName, taskNode, resolvedValue);
logger.debug("Resolving property value of {} for {} of {} to {}", configuredValue, configuredName, component, resolvedValue);
break;
}
}
@ -291,38 +416,39 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return resolved;
}
private BundleCoordinate determineBundleCoordinate(final ReportingTaskDefinition taskDefinition) {
final String explicitCoordinates = taskDefinition.getBundleCoordinates();
if (explicitCoordinates != null && !explicitCoordinates.trim().isEmpty()) {
final String resolvedClassName = resolveReportingTaskClassName(taskDefinition);
taskDefinition.setType(resolvedClassName);
final BundleCoordinate coordinate = parseBundleCoordinate(taskDefinition);
private BundleCoordinate determineBundleCoordinate(final ConfigurableExtensionDefinition extensionDefinition, final String extensionType) {
final String explicitCoordinates = extensionDefinition.getBundleCoordinates();
if (explicitCoordinates != null && !explicitCoordinates.trim().isEmpty()) {
final String resolvedClassName = resolveExtensionClassName(extensionDefinition, extensionType);
extensionDefinition.setType(resolvedClassName);
final BundleCoordinate coordinate = parseBundleCoordinate(extensionDefinition);
return coordinate;
}
final String specifiedType = taskDefinition.getType();
final String specifiedType = extensionDefinition.getType();
String resolvedClassName = specifiedType;
if (!specifiedType.contains(".")) {
final List<Bundle> possibleBundles = extensionManager.getBundles(taskDefinition.getType());
final List<Bundle> possibleBundles = extensionManager.getBundles(extensionDefinition.getType());
if (possibleBundles.isEmpty()) {
logger.debug("Could not find Reporting Task type of <{}>. Will try to find matching Reporting Task type based on class name", specifiedType);
logger.debug("Could not find extension type of <{}>. Will try to find matching Reporting Task type based on class name", specifiedType);
resolvedClassName = resolveReportingTaskClassName(taskDefinition);
taskDefinition.setType(resolvedClassName);
logger.info("Resolved Reporting Task class {} to {}", specifiedType, resolvedClassName);
resolvedClassName = resolveExtensionClassName(extensionDefinition, extensionType);
extensionDefinition.setType(resolvedClassName);
logger.info("Resolved extension class {} to {}", specifiedType, resolvedClassName);
}
}
final List<Bundle> possibleBundles = extensionManager.getBundles(resolvedClassName);
if (possibleBundles.isEmpty()) {
throw new IllegalArgumentException("Reporting Task '" + taskDefinition.getName() + "' (" + taskDefinition.getType() +
") does not specify a Bundle and no Bundles could be found for type " + taskDefinition.getType());
throw new IllegalArgumentException("Extension '" + extensionDefinition.getName() + "' (" + extensionDefinition.getType() +
") does not specify a Bundle and no Bundles could be found for type " + extensionDefinition.getType());
}
if (possibleBundles.size() > 1) {
throw new IllegalArgumentException("Reporting Task '" + taskDefinition.getName() + "' (" + taskDefinition.getType() +
") does not specify a Bundle and multiple Bundles exist for this type. The reporting task must specify a bundle to use.");
throw new IllegalArgumentException("Extension '" + extensionDefinition.getName() + "' (" + extensionDefinition.getType() +
") does not specify a Bundle and multiple Bundles exist for this type. The extension must specify a bundle to use.");
}
final Bundle bundle = possibleBundles.get(0);
@ -330,23 +456,24 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return coordinate;
}
private BundleCoordinate parseBundleCoordinate(final ReportingTaskDefinition taskDefinition) {
final String specifiedCoordinates = taskDefinition.getBundleCoordinates();
private BundleCoordinate parseBundleCoordinate(final ConfigurableExtensionDefinition extensionDefinition) {
final String specifiedCoordinates = extensionDefinition.getBundleCoordinates();
if (specifiedCoordinates == null) {
return null;
}
final String[] splits = specifiedCoordinates.split(":", 3);
if (splits.length != 3) {
throw new IllegalArgumentException("Reporting Task '" + taskDefinition.getName() + "' (" + taskDefinition.getType() + ") specifies bundle as '" + specifiedCoordinates + "', but this " +
"is not a valid Bundle format. Format should be <group>:<id>:<version>");
throw new IllegalArgumentException("Reporting Task '" + extensionDefinition.getName() + "' (" + extensionDefinition.getType() + ") specifies bundle as '" +
specifiedCoordinates + "', but this " + "is not a valid Bundle format. Format should be <group>:<id>:<version>");
}
return new BundleCoordinate(splits[0], splits[1], splits[2]);
}
private String resolveReportingTaskClassName(final ReportingTaskDefinition taskDefinition) {
final String specifiedType = taskDefinition.getType();
private String resolveExtensionClassName(final ConfigurableExtensionDefinition extensionDefinition, final String extensionType) {
final String specifiedType = extensionDefinition.getType();
if (specifiedType.contains(".")) {
return specifiedType;
}
@ -365,13 +492,13 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
}
if (possibleResolvedClassNames.isEmpty()) {
throw new IllegalArgumentException("Reporting Task '" + taskDefinition.getName() + "' (" + taskDefinition.getType() + ") does not specify a Bundle, and no Reporting Task" +
" implementations exist with a class name of " + taskDefinition.getType() + ".");
throw new IllegalArgumentException(String.format("%s '%s' (%s) does not specify a Bundle, and no %s implementations exist with a class name of %s.",
extensionType, extensionDefinition.getName(), extensionDefinition.getType(), extensionType, extensionDefinition.getType()));
}
if (possibleResolvedClassNames.size() > 1) {
throw new IllegalArgumentException("Reporting Task '" + taskDefinition.getName() + "' (" + taskDefinition.getType() + ") does not specify a Bundle, and no Reporting Task" +
" implementations exist with a class name of " + taskDefinition.getType() + ". Perhaps you meant one of: " + possibleResolvedClassNames);
throw new IllegalArgumentException(String.format("%s '%s' (%s) does not specify a Bundle, and no %s implementations exist with a class name of %s. Perhaps you meant one of: %s",
extensionType, extensionDefinition.getName(), extensionDefinition.getType(), extensionType, extensionDefinition.getType(), possibleResolvedClassNames));
}
return possibleResolvedClassNames.iterator().next();

View File

@ -32,7 +32,6 @@ import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.stateless.config.ParameterProvider;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.StatelessDataflow;
@ -40,7 +39,7 @@ public interface StatelessEngine<T> {
void initialize(StatelessEngineInitializationContext initializationContext);
StatelessDataflow createFlow(DataflowDefinition<T> dataflowDefinition, ParameterProvider parameterProvider);
StatelessDataflow createFlow(DataflowDefinition<T> dataflowDefinition);
ExtensionManager getExtensionManager();

View File

@ -23,6 +23,7 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterProviderDefinition;
import org.apache.nifi.stateless.config.ReportingTaskDefinition;
import java.util.Collections;
@ -37,6 +38,7 @@ public class StandardDataflowDefinition implements DataflowDefinition<VersionedF
private final Set<String> failurePortNames;
private final List<ParameterContextDefinition> parameterContexts;
private final List<ReportingTaskDefinition> reportingTaskDefinitions;
private final List<ParameterProviderDefinition> parameterProviderDefinitions;
private final TransactionThresholds transactionThresholds;
private final String flowName;
@ -46,6 +48,7 @@ public class StandardDataflowDefinition implements DataflowDefinition<VersionedF
parameterContexts = builder.parameterContexts == null ? Collections.emptyList() : builder.parameterContexts;
reportingTaskDefinitions = builder.reportingTaskDefinitions == null ? Collections.emptyList() : builder.reportingTaskDefinitions;
transactionThresholds = builder.transactionThresholds == null ? TransactionThresholds.SINGLE_FLOWFILE : builder.transactionThresholds;
parameterProviderDefinitions = builder.parameterProviderDefinitions == null ? Collections.emptyList() : builder.parameterProviderDefinitions;
flowName = builder.flowName;
}
@ -74,6 +77,11 @@ public class StandardDataflowDefinition implements DataflowDefinition<VersionedF
return reportingTaskDefinitions;
}
@Override
public List<ParameterProviderDefinition> getParameterProviderDefinitions() {
return parameterProviderDefinitions;
}
@Override
public TransactionThresholds getTransactionThresholds() {
return transactionThresholds;
@ -105,6 +113,7 @@ public class StandardDataflowDefinition implements DataflowDefinition<VersionedF
private Set<String> failurePortNames;
private List<ParameterContextDefinition> parameterContexts;
private List<ReportingTaskDefinition> reportingTaskDefinitions;
private List<ParameterProviderDefinition> parameterProviderDefinitions;
private TransactionThresholds transactionThresholds;
private String flowName;
@ -133,6 +142,11 @@ public class StandardDataflowDefinition implements DataflowDefinition<VersionedF
return this;
}
public Builder parameterProviders(final List<ParameterProviderDefinition> parameterProviders) {
this.parameterProviderDefinitions = parameterProviders;
return this;
}
public Builder transactionThresholds(final TransactionThresholds thresholds) {
this.transactionThresholds = thresholds;
return this;

View File

@ -54,7 +54,6 @@ import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.security.util.EncryptionMethod;
import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
import org.apache.nifi.stateless.config.ExtensionClientDefinition;
import org.apache.nifi.stateless.config.ParameterProvider;
import org.apache.nifi.stateless.config.SslConfigurationUtil;
import org.apache.nifi.stateless.config.SslContextDefinition;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
@ -92,8 +91,8 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
private static final EncryptionMethod ENCRYPTION_METHOD = EncryptionMethod.MD5_256AES;
@Override
public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition,
final ParameterProvider parameterProvider) throws IOException, StatelessConfigurationException {
public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition)
throws IOException, StatelessConfigurationException {
final long start = System.currentTimeMillis();
final VersionedFlowSnapshot flowSnapshot = dataflowDefinition.getFlowSnapshot();
@ -221,7 +220,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
rootGroup.setName("root");
flowManager.setRootGroup(rootGroup);
final StatelessDataflow dataflow = statelessEngine.createFlow(dataflowDefinition, parameterProvider);
final StatelessDataflow dataflow = statelessEngine.createFlow(dataflowDefinition);
final long millis = System.currentTimeMillis() - start;
logger.info("NiFi Stateless Engine and Dataflow created and initialized in {} millis", millis);

View File

@ -218,7 +218,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
// Create executor for dataflow
final String flowName = dataflowDefinition.getFlowName();
final String threadName = (flowName == null) ? "Run Dataflow" : "Run Dataflow " + flowName;
final String threadName = (flowName == null || flowName.trim().isEmpty()) ? "Run Dataflow" : "Run Dataflow " + flowName;
runDataflowExecutor = Executors.newFixedThreadPool(1, createNamedThreadFactory(threadName, false));
// Periodically log component statuses
@ -354,7 +354,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
future.get(COMPONENT_ENABLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
throw new IllegalStateException("Controller Service " + serviceNode + " has not fully enabled. Current Validation Status is "
+ serviceNode.getValidationStatus() + " with validation Errors: " + serviceNode.getValidationErrors());
+ serviceNode.getValidationStatus() + " with validation Errors: " + serviceNode.getValidationErrors(), e);
}
}

View File

@ -15,14 +15,12 @@
* limitations under the License.
*/
package org.apache.nifi.stateless.bootstrap;
import org.apache.nifi.stateless.config.ParameterProvider;
package org.apache.nifi.stateless.parameter;
import java.util.ArrayList;
import java.util.List;
public class CompositeParameterProvider implements ParameterProvider {
public class CompositeParameterProvider extends AbstractParameterProvider implements ParameterProvider {
private final List<ParameterProvider> parameterProviders;
public CompositeParameterProvider(final List<ParameterProvider> providers) {
@ -32,6 +30,10 @@ public class CompositeParameterProvider implements ParameterProvider {
@Override
public String getParameterValue(final String contextName, final String parameterName) {
for (final ParameterProvider provider : parameterProviders) {
if (!provider.isParameterDefined(contextName, parameterName)) {
continue;
}
final String value = provider.getParameterValue(contextName, parameterName);
if (value != null) {
return value;

View File

@ -15,13 +15,11 @@
* limitations under the License.
*/
package org.apache.nifi.stateless.bootstrap;
import org.apache.nifi.stateless.config.ParameterProvider;
package org.apache.nifi.stateless.parameter;
import java.util.Map;
public class EnvironmentVariableParameterProvider implements ParameterProvider {
public class EnvironmentVariableParameterProvider extends AbstractParameterProvider implements ParameterProvider {
private final Map<String, String> environmentVariables = System.getenv();
@Override

View File

@ -15,19 +15,56 @@
* limitations under the License.
*/
package org.apache.nifi.stateless.bootstrap;
package org.apache.nifi.stateless.parameter;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.stateless.config.ParameterOverride;
import org.apache.nifi.stateless.config.ParameterProvider;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class ParameterOverrideProvider implements ParameterProvider {
private final List<ParameterOverride> parameterOverrides;
public class ParameterOverrideProvider extends AbstractParameterProvider implements ParameterProvider {
// Effectively final
private List<ParameterOverride> parameterOverrides;
public ParameterOverrideProvider(final List<ParameterOverride> overrides) {
this.parameterOverrides = overrides;
@Override
public void init(final ParameterProviderInitializationContext context) {
parameterOverrides = parseConfiguration(context);
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.addValidator(Validator.VALID)
.build();
}
private List<ParameterOverride> parseConfiguration(final ParameterProviderInitializationContext context) {
final List<ParameterOverride> overrides = new ArrayList<>();
final Map<String, String> properties = context.getAllProperties();
for (final Map.Entry<String, String> entry : properties.entrySet()) {
final String propertyName = entry.getKey();
final String propertyValue = entry.getValue();
final ParameterOverride override;
if (propertyName.contains(":")) {
final String[] splits = propertyName.split(":", 2);
final String contextName = splits[0];
final String parameterName = splits[1];
override = new ParameterOverride(contextName, parameterName, propertyValue);
} else {
override = new ParameterOverride(propertyName, propertyValue);
}
overrides.add(override);
}
return overrides;
}
@Override

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.stateless.parameter.EnvironmentVariableParameterProvider
org.apache.nifi.stateless.parameter.ParameterOverrideProvider

View File

@ -25,6 +25,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -40,7 +41,9 @@ public class TestPropertiesFileFlowDefinitionParser {
public void testParse() throws IOException, StatelessConfigurationException {
final PropertiesFileFlowDefinitionParser parser = new PropertiesFileFlowDefinitionParser();
final DataflowDefinition dataflowDefinition = parser.parseFlowDefinition(new File("src/test/resources/flow-configuration.properties"), createStatelessEngineConfiguration());
final List<ParameterOverride> parameterOverrides = new ArrayList<>();
final StatelessEngineConfiguration engineConfig = createStatelessEngineConfiguration();
final DataflowDefinition<?> dataflowDefinition = parser.parseFlowDefinition(new File("src/test/resources/flow-configuration.properties"), engineConfig, parameterOverrides);
assertEquals(new HashSet<>(Arrays.asList("foo", "bar", "baz")), dataflowDefinition.getFailurePortNames());
final List<ParameterContextDefinition> contextDefinitions = dataflowDefinition.getParameterContexts();

View File

@ -20,10 +20,10 @@ package org.apache.nifi.stateless;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.registry.flow.Bundle;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.stateless.bootstrap.EmptyParameterProvider;
import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
import org.apache.nifi.stateless.config.ExtensionClientDefinition;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterProviderDefinition;
import org.apache.nifi.stateless.config.ReportingTaskDefinition;
import org.apache.nifi.stateless.config.SslContextDefinition;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
@ -137,8 +137,14 @@ public class StatelessSystemIT {
protected StatelessDataflow loadDataflow(final VersionedFlowSnapshot versionedFlowSnapshot, final List<ParameterContextDefinition> parameterContexts, final Set<String> failurePortNames,
final TransactionThresholds transactionThresholds) throws IOException, StatelessConfigurationException {
return loadDataflow(versionedFlowSnapshot, parameterContexts, Collections.emptyList(), failurePortNames, transactionThresholds);
}
final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition = new DataflowDefinition<VersionedFlowSnapshot>() {
protected StatelessDataflow loadDataflow(final VersionedFlowSnapshot versionedFlowSnapshot, final List<ParameterContextDefinition> parameterContexts,
final List<ParameterProviderDefinition> parameterProviderDefinitions, final Set<String> failurePortNames,
final TransactionThresholds transactionThresholds) throws IOException, StatelessConfigurationException {
final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition = new DataflowDefinition<VersionedFlowSnapshot>() {
@Override
public VersionedFlowSnapshot getFlowSnapshot() {
return versionedFlowSnapshot;
@ -161,7 +167,12 @@ public class StatelessSystemIT {
@Override
public List<ReportingTaskDefinition> getReportingTaskDefinitions() {
return Collections.emptyList();
return Collections.emptyList();
}
@Override
public List<ParameterProviderDefinition> getParameterProviderDefinitions() {
return parameterProviderDefinitions;
}
@Override
@ -171,7 +182,7 @@ public class StatelessSystemIT {
};
final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(getEngineConfiguration());
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition, new EmptyParameterProvider());
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition);
dataflow.initialize();
createdFlows.add(dataflow);

View File

@ -29,10 +29,13 @@ import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterDefinition;
import org.apache.nifi.stateless.config.ParameterProviderDefinition;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TransactionThresholds;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
@ -47,6 +50,170 @@ import static org.junit.Assert.assertTrue;
public class ParameterContextIT extends StatelessSystemIT {
@Test
public void testCustomParameterProvider() throws IOException, StatelessConfigurationException, InterruptedException {
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedPort outPort = flowBuilder.createOutputPort("Out");
final VersionedProcessor generate = flowBuilder.createSimpleProcessor("GenerateFlowFile");
generate.setProperties(Collections.singletonMap("Batch Size", "#{three}"));
flowBuilder.createConnection(generate, outPort, "success");
final VersionedFlowSnapshot flowSnapshot = flowBuilder.getFlowSnapshot();
// Define the Parameter Context to use
final ParameterProviderDefinition numericParameterProvider = new ParameterProviderDefinition();
numericParameterProvider.setName("Numeric Parameter Provider");
numericParameterProvider.setType("org.apache.nifi.stateless.parameters.NumericParameterProvider");
final List<ParameterProviderDefinition> parameterProviders = Collections.singletonList(numericParameterProvider);
// Create a Parameter Context & set it on the root group.
final VersionedParameterContext parameterContext = flowBuilder.createParameterContext("Context 1");
parameterContext.getParameters().add(createVersionedParameter("three", "-1")); // Set value to -1. This should be overridden by the Numeric Parameter Context.
flowBuilder.getRootGroup().setParameterContextName("Context 1");
// Startup the dataflow
final StatelessDataflow dataflow = loadDataflow(flowSnapshot, Collections.emptyList(), parameterProviders, Collections.emptySet(), TransactionThresholds.SINGLE_FLOWFILE);
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
final List<FlowFile> outputFlowFiles = result.getOutputFlowFiles().get("Out");
assertEquals(3, outputFlowFiles.size());
result.acknowledge();
}
@Test
public void testInvalidParameterProvider() {
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedPort outPort = flowBuilder.createOutputPort("Out");
final VersionedProcessor generate = flowBuilder.createSimpleProcessor("GenerateFlowFile");
generate.setProperties(Collections.singletonMap("Batch Size", "#{three}"));
flowBuilder.createConnection(generate, outPort, "success");
final VersionedFlowSnapshot flowSnapshot = flowBuilder.getFlowSnapshot();
// Define the Parameter Context to use
final ParameterProviderDefinition numericParameterProvider = new ParameterProviderDefinition();
numericParameterProvider.setName("Invalid Parameter Provider");
numericParameterProvider.setType("org.apache.nifi.stateless.parameters.InvalidParameterProvider");
final List<ParameterProviderDefinition> parameterProviders = Collections.singletonList(numericParameterProvider);
// Create a Parameter Context & set it on the root group.
final VersionedParameterContext parameterContext = flowBuilder.createParameterContext("Context 1");
parameterContext.getParameters().add(createVersionedParameter("three", "-1")); // Set value to -1. This should be overridden by the Numeric Parameter Context.
flowBuilder.getRootGroup().setParameterContextName("Context 1");
Assert.assertThrows(IllegalStateException.class, () -> {
loadDataflow(flowSnapshot, Collections.emptyList(), parameterProviders, Collections.emptySet(), TransactionThresholds.SINGLE_FLOWFILE);
});
}
@Test
public void testParameterProviderWithRequiredPropertyNotSet() throws IOException, StatelessConfigurationException {
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedPort outPort = flowBuilder.createOutputPort("Out");
final VersionedProcessor generate = flowBuilder.createSimpleProcessor("GenerateFlowFile");
generate.setProperties(Collections.singletonMap("Batch Size", "#{three}"));
flowBuilder.createConnection(generate, outPort, "success");
final VersionedFlowSnapshot flowSnapshot = flowBuilder.getFlowSnapshot();
// Define the Parameter Context to use
final ParameterProviderDefinition numericParameterProvider = new ParameterProviderDefinition();
numericParameterProvider.setName("Parameter Provider With Properties");
numericParameterProvider.setType("org.apache.nifi.stateless.parameters.ParameterProviderWithProperties");
final List<ParameterProviderDefinition> parameterProviders = Collections.singletonList(numericParameterProvider);
// Create a Parameter Context & set it on the root group.
final VersionedParameterContext parameterContext = flowBuilder.createParameterContext("Context 1");
parameterContext.getParameters().add(createVersionedParameter("three", "1")); // Set value to -1. This should be overridden by the Numeric Parameter Context.
flowBuilder.getRootGroup().setParameterContextName("Context 1");
Assert.assertThrows(IllegalStateException.class, () -> {
loadDataflow(flowSnapshot, Collections.emptyList(), parameterProviders, Collections.emptySet(), TransactionThresholds.SINGLE_FLOWFILE);
});
}
@Test
public void testParameterProviderWithRequiredPropertySet() throws IOException, StatelessConfigurationException {
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedPort outPort = flowBuilder.createOutputPort("Out");
final VersionedProcessor generate = flowBuilder.createSimpleProcessor("GenerateFlowFile");
generate.setProperties(Collections.singletonMap("Batch Size", "#{three}"));
flowBuilder.createConnection(generate, outPort, "success");
final VersionedFlowSnapshot flowSnapshot = flowBuilder.getFlowSnapshot();
// Define the Parameter Context to use
final ParameterProviderDefinition numericParameterProvider = new ParameterProviderDefinition();
numericParameterProvider.setName("Parameter Provider With Properties");
numericParameterProvider.setType("org.apache.nifi.stateless.parameters.ParameterProviderWithProperties");
numericParameterProvider.setPropertyValues(Collections.singletonMap("Required", "Hello"));
final List<ParameterProviderDefinition> parameterProviders = Collections.singletonList(numericParameterProvider);
// Create a Parameter Context & set it on the root group.
final VersionedParameterContext parameterContext = flowBuilder.createParameterContext("Context 1");
parameterContext.getParameters().add(createVersionedParameter("three", "1")); // Set value to -1. This should be overridden by the Numeric Parameter Context.
flowBuilder.getRootGroup().setParameterContextName("Context 1");
loadDataflow(flowSnapshot, Collections.emptyList(), parameterProviders, Collections.emptySet(), TransactionThresholds.SINGLE_FLOWFILE);
}
@Test
public void testParameterProviderCanAccessPropertyValues() throws IOException, StatelessConfigurationException, InterruptedException {
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedPort outPort = flowBuilder.createOutputPort("Out");
final VersionedProcessor generate = flowBuilder.createSimpleProcessor("GenerateFlowFile");
generate.setProperties(Collections.singletonMap("Batch Size", "#{Required}"));
flowBuilder.createConnection(generate, outPort, "success");
final VersionedFlowSnapshot flowSnapshot = flowBuilder.getFlowSnapshot();
// Define the Parameter Context to use
final Map<String, String> providerProperties = new HashMap<>();
providerProperties.put("Required", "3");
providerProperties.put("Optional", "7");
final ParameterProviderDefinition numericParameterProvider = new ParameterProviderDefinition();
numericParameterProvider.setName("Parameter Provider With Properties");
numericParameterProvider.setType("org.apache.nifi.stateless.parameters.ParameterProviderWithProperties");
numericParameterProvider.setPropertyValues(providerProperties);
final List<ParameterProviderDefinition> parameterProviders = Collections.singletonList(numericParameterProvider);
// Create a Parameter Context & set it on the root group.
final VersionedParameterContext parameterContext = flowBuilder.createParameterContext("Context 1");
parameterContext.getParameters().add(createVersionedParameter("Required", "1")); // Set value to -1. This should be overridden by the Numeric Parameter Context.
parameterContext.getParameters().add(createVersionedParameter("Optional", "1")); // Set value to -1. This should be overridden by the Numeric Parameter Context.
flowBuilder.getRootGroup().setParameterContextName("Context 1");
final StatelessDataflow dataflowWithRequiredParam = loadDataflow(flowSnapshot, Collections.emptyList(), parameterProviders, Collections.emptySet(), TransactionThresholds.SINGLE_FLOWFILE);
final DataflowTrigger requiredTrigger = dataflowWithRequiredParam.trigger();
final TriggerResult requiredResult = requiredTrigger.getResult();
final List<FlowFile> requiredOutputFlowFiles = requiredResult.getOutputFlowFiles().get("Out");
assertEquals(3, requiredOutputFlowFiles.size());
requiredResult.acknowledge();
dataflowWithRequiredParam.shutdown();
// Test with Optional parameter referenced
generate.setProperties(Collections.singletonMap("Batch Size", "#{Optional}"));
final StatelessDataflow dataflowWithOptionalParam = loadDataflow(flowSnapshot, Collections.emptyList(), parameterProviders, Collections.emptySet(), TransactionThresholds.SINGLE_FLOWFILE);
final DataflowTrigger optionalTrigger = dataflowWithOptionalParam.trigger();
final TriggerResult optionalResult = optionalTrigger.getResult();
final List<FlowFile> optionalOutputFlowFiles = optionalResult.getOutputFlowFiles().get("Out");
assertEquals(7, optionalOutputFlowFiles.size());
optionalResult.acknowledge();
}
@Test
public void testMultipleParameterContexts() throws IOException, StatelessConfigurationException, InterruptedException {
// Build dataflow

View File

@ -29,6 +29,11 @@
<artifactId>nifi-api</artifactId>
<version>1.14.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-stateless-api</artifactId>
<version>1.14.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.parameters;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.stateless.parameter.AbstractParameterProvider;
import java.util.Collection;
import java.util.Collections;
public class InvalidParameterProvider extends AbstractParameterProvider {
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final ValidationResult validationResult = new ValidationResult.Builder()
.valid(false)
.explanation("This Parameter Provider is never valid")
.build();
return Collections.singleton(validationResult);
}
@Override
public String getParameterValue(final String contextName, final String parameterName) {
return null;
}
@Override
public boolean isParameterDefined(final String contextName, final String parameterName) {
return false;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.parameters;
import org.apache.nifi.stateless.parameter.AbstractParameterProvider;
import java.util.HashMap;
import java.util.Map;
public class NumericParameterProvider extends AbstractParameterProvider {
private final Map<String, String> parameterValues = new HashMap<>();
{
parameterValues.put("zero", "0");
parameterValues.put("one", "1");
parameterValues.put("two", "2");
parameterValues.put("three", "3");
parameterValues.put("four", "4");
parameterValues.put("five", "5");
parameterValues.put("six", "6");
parameterValues.put("seven", "7");
parameterValues.put("eight", "8");
parameterValues.put("nine", "9");
}
@Override
public String getParameterValue(final String contextName, final String parameterName) {
return parameterValues.get(parameterName);
}
@Override
public boolean isParameterDefined(final String contextName, final String parameterName) {
return parameterValues.containsKey(parameterName);
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.parameters;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.stateless.parameter.AbstractParameterProvider;
import java.util.Arrays;
import java.util.List;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
public class ParameterProviderWithProperties extends AbstractParameterProvider {
static final PropertyDescriptor REQUIRED_PARAMETER = new PropertyDescriptor.Builder()
.name("Required")
.displayName("Required")
.description("A required parameter")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(NONE)
.build();
static final PropertyDescriptor OPTIONAL_PARAMETER = new PropertyDescriptor.Builder()
.name("Optional")
.displayName("Optional")
.description("An optional parameter")
.required(false)
.addValidator(NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(NONE)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(REQUIRED_PARAMETER, OPTIONAL_PARAMETER);
}
@Override
public String getParameterValue(final String contextName, final String parameterName) {
return getPropertyContext().getAllProperties().get(parameterName);
}
@Override
public boolean isParameterDefined(final String contextName, final String parameterName) {
return getPropertyContext().getAllProperties().containsKey(parameterName);
}
}

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.stateless.parameters.InvalidParameterProvider
org.apache.nifi.stateless.parameters.NumericParameterProvider
org.apache.nifi.stateless.parameters.ParameterProviderWithProperties