mirror of https://github.com/apache/nifi.git
NIFI-9402: Adding DatabaseParameterProvider
Adding provided scope to api dependency in nifi-standard-parameter-providers module Adding additional documentation, other minor code cleanup Correcting error handling in StandardParameterProviderNode, updating additional details for DatabaseParameterProvider Correcting null columm value handling NIFI-9402: Fixed Checkstyle violation Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #6391
This commit is contained in:
parent
013d01a9fc
commit
094222260c
|
@ -277,8 +277,8 @@ public class StandardParameterProviderNode extends AbstractComponentNode impleme
|
|||
List<ParameterGroup> fetchedParameterGroups;
|
||||
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), parameterProvider.getClass(), parameterProvider.getIdentifier())) {
|
||||
fetchedParameterGroups = parameterProvider.fetchParameters(configurationContext);
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeException(String.format("Error fetching parameters for %s", this), e);
|
||||
} catch (final IOException | RuntimeException e) {
|
||||
throw new IllegalStateException(String.format("Error fetching parameters for %s: %s", this, e.getMessage()), e);
|
||||
}
|
||||
|
||||
if (fetchedParameterGroups == null || fetchedParameterGroups.isEmpty()) {
|
||||
|
|
|
@ -30,6 +30,17 @@
|
|||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-dbcp-service-api</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-processors</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
|
|
|
@ -0,0 +1,265 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.parameter;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.dbcp.DBCPService;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Tags({"database", "dbcp", "sql"})
|
||||
@CapabilityDescription("Fetches parameters from database tables")
|
||||
|
||||
public class DatabaseParameterProvider extends AbstractParameterProvider implements VerifiableParameterProvider {
|
||||
|
||||
protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();
|
||||
|
||||
public static final PropertyDescriptor DB_TYPE;
|
||||
|
||||
static {
|
||||
// Load the DatabaseAdapters
|
||||
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
|
||||
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
|
||||
dbAdapterLoader.forEach(it -> {
|
||||
dbAdapters.put(it.getName(), it);
|
||||
dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(), it.getDescription()));
|
||||
});
|
||||
|
||||
DB_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("db-type")
|
||||
.displayName("Database Type")
|
||||
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
|
||||
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
|
||||
.allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()]))
|
||||
.defaultValue("Generic")
|
||||
.required(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
static AllowableValue GROUPING_BY_COLUMN = new AllowableValue("grouping-by-column", "Column",
|
||||
"A single table is partitioned by the 'Parameter Group Name Column'. All rows with the same value in this column will " +
|
||||
"map to a group of the same name.");
|
||||
static AllowableValue GROUPING_BY_TABLE_NAME = new AllowableValue("grouping-by-table-name", "Table Name",
|
||||
"An entire table maps to a Parameter Group. The group name will be the table name.");
|
||||
|
||||
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("dbcp-service")
|
||||
.displayName("Database Connection Pooling Service")
|
||||
.description("The Controller Service that is used to obtain a connection to the database.")
|
||||
.required(true)
|
||||
.identifiesControllerService(DBCPService.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PARAMETER_GROUPING_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("parameter-grouping-strategy")
|
||||
.displayName("Parameter Grouping Strategy")
|
||||
.description("The strategy used to group parameters.")
|
||||
.required(true)
|
||||
.allowableValues(GROUPING_BY_COLUMN, GROUPING_BY_TABLE_NAME)
|
||||
.defaultValue(GROUPING_BY_COLUMN.getValue())
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor TABLE_NAMES = new PropertyDescriptor.Builder()
|
||||
.name("table-names")
|
||||
.displayName("Table Names")
|
||||
.description("A comma-separated list of names of the database tables containing the parameters.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(true)
|
||||
.dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_TABLE_NAME)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
|
||||
.name("table-name")
|
||||
.displayName("Table Name")
|
||||
.description("The name of the database table containing the parameters.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(true)
|
||||
.dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_COLUMN)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PARAMETER_NAME_COLUMN = new PropertyDescriptor.Builder()
|
||||
.name("parameter-name-column")
|
||||
.displayName("Parameter Name Column")
|
||||
.description("The name of a column containing the parameter name.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PARAMETER_VALUE_COLUMN = new PropertyDescriptor.Builder()
|
||||
.name("parameter-value-column")
|
||||
.displayName("Parameter Value Column")
|
||||
.description("The name of a column containing the parameter value.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PARAMETER_GROUP_NAME_COLUMN = new PropertyDescriptor.Builder()
|
||||
.name("parameter-group-name-column")
|
||||
.displayName("Parameter Group Name Column")
|
||||
.description("The name of a column containing the name of the parameter group into which the parameter should be mapped.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(true)
|
||||
.dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_COLUMN)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SQL_WHERE_CLAUSE = new PropertyDescriptor.Builder()
|
||||
.name("sql-where-clause")
|
||||
.displayName("SQL WHERE clause")
|
||||
.description("A optional SQL query 'WHERE' clause by which to filter all results. The 'WHERE' keyword should not be included.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
private List<PropertyDescriptor> properties;
|
||||
|
||||
@Override
|
||||
protected void init(final ParameterProviderInitializationContext config) {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(DB_TYPE);
|
||||
properties.add(DBCP_SERVICE);
|
||||
properties.add(PARAMETER_GROUPING_STRATEGY);
|
||||
properties.add(TABLE_NAME);
|
||||
properties.add(TABLE_NAMES);
|
||||
properties.add(PARAMETER_NAME_COLUMN);
|
||||
properties.add(PARAMETER_VALUE_COLUMN);
|
||||
properties.add(PARAMETER_GROUP_NAME_COLUMN);
|
||||
properties.add(SQL_WHERE_CLAUSE);
|
||||
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ParameterGroup> fetchParameters(final ConfigurationContext context) {
|
||||
final boolean groupByColumn = GROUPING_BY_COLUMN.getValue().equals(context.getProperty(PARAMETER_GROUPING_STRATEGY).getValue());
|
||||
|
||||
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
|
||||
final String whereClause = context.getProperty(SQL_WHERE_CLAUSE).getValue();
|
||||
final String parameterNameColumn = context.getProperty(PARAMETER_NAME_COLUMN).getValue();
|
||||
final String parameterValueColumn = context.getProperty(PARAMETER_VALUE_COLUMN).getValue();
|
||||
final String parameterGroupNameColumn = context.getProperty(PARAMETER_GROUP_NAME_COLUMN).getValue();
|
||||
|
||||
final List<String> tableNames = groupByColumn
|
||||
? Collections.singletonList(context.getProperty(TABLE_NAME).getValue())
|
||||
: Arrays.stream(context.getProperty(TABLE_NAMES).getValue().split(",")).map(String::trim).collect(Collectors.toList());
|
||||
|
||||
final Map<String, List<Parameter>> parameterMap = new HashMap<>();
|
||||
for (final String tableName : tableNames) {
|
||||
try (final Connection con = dbcpService.getConnection(Collections.emptyMap()); final Statement st = con.createStatement()) {
|
||||
final List<String> columns = new ArrayList<>();
|
||||
columns.add(parameterNameColumn);
|
||||
columns.add(parameterValueColumn);
|
||||
if (groupByColumn) {
|
||||
columns.add(parameterGroupNameColumn);
|
||||
}
|
||||
final String query = getQuery(context, tableName, columns, whereClause);
|
||||
|
||||
getLogger().info("Fetching parameters with query: " + query);
|
||||
try (final ResultSet rs = st.executeQuery(query)) {
|
||||
while (rs.next()) {
|
||||
final String parameterName = rs.getString(parameterNameColumn);
|
||||
final String parameterValue = rs.getString(parameterValueColumn);
|
||||
|
||||
validateValueNotNull(parameterName, parameterNameColumn);
|
||||
validateValueNotNull(parameterValue, parameterValueColumn);
|
||||
final String parameterGroupName;
|
||||
if (groupByColumn) {
|
||||
parameterGroupName = parameterGroupNameColumn == null ? null : rs.getString(parameterGroupNameColumn);
|
||||
validateValueNotNull(parameterGroupName, parameterGroupNameColumn);
|
||||
} else {
|
||||
parameterGroupName = tableName;
|
||||
}
|
||||
|
||||
final ParameterDescriptor parameterDescriptor = new ParameterDescriptor.Builder()
|
||||
.name(parameterName)
|
||||
.build();
|
||||
final Parameter parameter = new Parameter(parameterDescriptor, parameterValue);
|
||||
|
||||
parameterMap.computeIfAbsent(parameterGroupName, key -> new ArrayList<>()).add(parameter);
|
||||
}
|
||||
}
|
||||
} catch (final SQLException e) {
|
||||
getLogger().error("Encountered a database error when fetching parameters: {}", e.getMessage(), e);
|
||||
throw new RuntimeException("Encountered a database error when fetching parameters: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
return parameterMap.entrySet().stream()
|
||||
.map(entry -> new ParameterGroup(entry.getKey(), entry.getValue()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private void validateValueNotNull(final String value, final String columnName) {
|
||||
if (value == null) {
|
||||
throw new IllegalStateException(String.format("Expected %s column to be non-null", columnName));
|
||||
}
|
||||
}
|
||||
|
||||
String getQuery(final ConfigurationContext context, final String tableName, final List<String> columns, final String whereClause) {
|
||||
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
|
||||
return dbAdapter.getSelectStatement(tableName, StringUtils.join(columns, ", "), whereClause, null, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger) {
|
||||
final List<ConfigVerificationResult> results = new ArrayList<>();
|
||||
try {
|
||||
final List<ParameterGroup> parameterGroups = fetchParameters(context);
|
||||
final long parameterCount = parameterGroups.stream()
|
||||
.flatMap(group -> group.getParameters().stream())
|
||||
.count();
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
|
||||
.verificationStepName("Fetch Parameters")
|
||||
.explanation(String.format("Successfully fetched %s Parameter Groups containing %s Parameters matching the filter.", parameterGroups.size(),
|
||||
parameterCount))
|
||||
.build());
|
||||
} catch (final Exception e) {
|
||||
verificationLogger.error("Failed to fetch Parameter Groups", e);
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
.outcome(ConfigVerificationResult.Outcome.FAILED)
|
||||
.verificationStepName("Fetch Parameters")
|
||||
.explanation(String.format("Failed to parameters: " + e.getMessage()))
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
}
|
|
@ -14,3 +14,4 @@
|
|||
# limitations under the License.
|
||||
org.apache.nifi.parameter.EnvironmentVariableParameterProvider
|
||||
org.apache.nifi.parameter.FileParameterProvider
|
||||
org.apache.nifi.parameter.DatabaseParameterProvider
|
||||
|
|
|
@ -0,0 +1,214 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en" xmlns="http://www.w3.org/1999/html">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<head>
|
||||
<meta charset="utf-8"/>
|
||||
<title>DatabaseParameterProvider</title>
|
||||
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
<h1>Providing Parameters from a Database</h1>
|
||||
|
||||
<p>
|
||||
The DatabaseParameterProvider at its core maps database rows to Parameters, specified by a
|
||||
Parameter Name Column and Parameter Value Column. The Parameter Group name must also be accounted for, and may
|
||||
be specified in different ways using the Parameter Grouping Strategy.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Before discussing the actual configuration, note that in some databases, the words 'PARAMETER', 'PARAMETERS', 'GROUP',
|
||||
and even 'VALUE' are reserved words. If you choose a column name that is a reserved word in the database you are using,
|
||||
make sure to quote it per the database documentation.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Also note that you should use the preferred table name and column name case for your database. For example, Postgres
|
||||
prefers lowercase table and column names, while Oracle prefers capitalized ones. Choosing the appropriate case can
|
||||
avoid unexpected issues in configuring your DatabaseParameterProvider.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
The default configuration uses a fully column-based approach, with the Parameter Group Name
|
||||
also specified by columns in the same table. An example of a table using this configuration would be:
|
||||
</p>
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th colspan="4" style="text-align: center">PARAMETER_CONTEXTS</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<th>PARAMETER_NAME</th><th>PARAMETER_VALUE</th><th>PARAMETER_GROUP</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<tr>
|
||||
<td>param.foo</td><td>value-foo</td><td>group_1</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>param.bar</td><td>value-bar</td><td>group_1</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>param.one</td><td>value-one</td><td>group_2</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>param.two</td><td>value-two</td><td>group_2</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
<caption>Table 1: Database table example with Grouping Strategy = Column</caption>
|
||||
</table>
|
||||
|
||||
<p>
|
||||
In order to use the data from this table, set the following Properties:
|
||||
</p>
|
||||
|
||||
<ul>
|
||||
<li><b>Parameter Grouping Strategy</b> - Column</li>
|
||||
<li><b>Table Name</b> - PARAMETER_CONTEXTS</li>
|
||||
<li><b>Parameter Name Column</b> - PARAMETER_NAME</li>
|
||||
<li><b>Parameter Value Column</b> - PARAMETER_VALUE</li>
|
||||
<li><b>Parameter Group Name Column</b> - PARAMETER_GROUP</li>
|
||||
</ul>
|
||||
|
||||
<p>
|
||||
Once fetched, the parameters in this example will look like this:
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Parameter Group <b>group_1</b>:
|
||||
<ul>
|
||||
<li>param.foo - value-foo</li>
|
||||
<li>param.bar - value-bar</li>
|
||||
</ul>
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Parameter Group <b>group_2</b>:
|
||||
<ul>
|
||||
<li>param.one - value-one</li>
|
||||
<li>param.two - value-two</li>
|
||||
</ul>
|
||||
</p>
|
||||
|
||||
<h3>Grouping Strategy</h3>
|
||||
|
||||
<p>
|
||||
The default Grouping Strategy is by Column, which allows you to specify the parameter Group name explicitly in the Parameter Group Column.
|
||||
Note that if the value in this column is NULL, an exception will be thrown.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
The other Grouping Strategy is by Table, which maps each table to a Parameter Group and sets the Parameter Group Name to the table name.
|
||||
In this Grouping Strategy, the Parameter Group Column is not used. An example configuration using this strategy would be:
|
||||
</p>
|
||||
|
||||
<ul>
|
||||
<li><b>Parameter Grouping Strategy</b> - Table</li>
|
||||
<li><b>Table Names</b> - KAFKA, S3</li>
|
||||
<li><b>Parameter Name Column</b> - PARAMETER_NAME</li>
|
||||
<li><b>Parameter Value Column</b> - PARAMETER_VALUE</li>
|
||||
</ul>
|
||||
|
||||
<p>
|
||||
An example of some tables that may be used with this strategy:
|
||||
</p>
|
||||
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th colspan="3" style="text-align: center">KAFKA</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<th>PARAMETER_NAME</th><th>PARAMETER_VALUE</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<tr>
|
||||
<td>brokers</td><td>http://localhost:9092</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>topic</td><td>my-topic</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>password</td><td>my-password</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
<caption>Table 2: 'KAFKA' Database table example with Grouping Strategy = Table</caption>
|
||||
</table>
|
||||
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th colspan="3" style="text-align: center">S3</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<th>PARAMETER_NAME</th><th>PARAMETER_VALUE</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<tr>
|
||||
<td>bucket</td><td>my-bucket</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>secret.access.key</td><td>my-key</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
<caption>Table 3: 'S3' Database table example with Grouping Strategy = Table</caption>
|
||||
</table>
|
||||
|
||||
<p>
|
||||
Once fetched, the parameters in this example will look like this:
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Parameter Group <b>KAFKA</b>:
|
||||
<ul>
|
||||
<li>brokers - http://localhost:9092</li>
|
||||
<li>topic - my-topic</li>
|
||||
<li>password - my-password</li>
|
||||
</ul>
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Parameter Group <b>S3</b>:
|
||||
<ul>
|
||||
<li>bucket - my-bucket</li>
|
||||
<li>secret.access.key - my-key</li>
|
||||
</ul>
|
||||
</p>
|
||||
|
||||
<h3>Filtering rows</h3>
|
||||
|
||||
<p>
|
||||
If you need to include only some rows in a table as parameters, you can use the 'SQL WHERE clause' property. An example of this is as follows:
|
||||
</p>
|
||||
|
||||
<ul>
|
||||
<li><b>Parameter Grouping Strategy</b> - Table</li>
|
||||
<li><b>Table Names</b> - KAFKA, S3</li>
|
||||
<li><b>Parameter Name Column</b> - PARAMETER_NAME</li>
|
||||
<li><b>Parameter Value Column</b> - PARAMETER_VALUE</li>
|
||||
<li><b>SQL WHERE clause</b> - OTHER_COLUMN = 'my-parameters'</li>
|
||||
</ul>
|
||||
|
||||
<p>
|
||||
Here we are assuming there is another column, 'OTHER_COLUMN' in both the KAFKA and S3 tables. Only rows whose 'OTHER_COLUMN' value is 'my-parameters'
|
||||
will then be fetched from these tables.
|
||||
</p>
|
||||
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,303 @@
|
|||
package org.apache.nifi.parameter;/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.dbcp.DBCPService;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.MockConfigurationContext;
|
||||
import org.apache.nifi.util.MockParameterProviderInitializationContext;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.mockito.stubbing.OngoingStubbing;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestDatabaseParameterProvider {
|
||||
|
||||
public static final String DBCP_SERVICE = "dbcp-service";
|
||||
public static final String TABLE_NAME = "myTable";
|
||||
private DBCPService dbcpService;
|
||||
|
||||
private DatabaseParameterProvider parameterProvider;
|
||||
|
||||
private MockParameterProviderInitializationContext initializationContext;
|
||||
|
||||
private Map<PropertyDescriptor, String> columnBasedProperties;
|
||||
|
||||
private Map<PropertyDescriptor, String> nonColumnBasedProperties;
|
||||
|
||||
@Before
|
||||
public void init() throws InitializationException {
|
||||
dbcpService = mock(DBCPService.class);
|
||||
|
||||
final DatabaseParameterProvider rawProvider = new DatabaseParameterProvider();
|
||||
initializationContext = new MockParameterProviderInitializationContext("id", "name", mock(ComponentLog.class));
|
||||
initializationContext.addControllerService(dbcpService, DBCP_SERVICE);
|
||||
rawProvider.initialize(initializationContext);
|
||||
parameterProvider = spy(rawProvider);
|
||||
// Return the table name
|
||||
doAnswer(invocationOnMock -> invocationOnMock.getArgument(1)).when(parameterProvider).getQuery(any(), any(), any(), any());
|
||||
|
||||
columnBasedProperties = new HashMap<>();
|
||||
|
||||
columnBasedProperties.put(DatabaseParameterProvider.DBCP_SERVICE, DBCP_SERVICE);
|
||||
columnBasedProperties.put(DatabaseParameterProvider.DB_TYPE, "Generic");
|
||||
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUPING_STRATEGY, DatabaseParameterProvider.GROUPING_BY_COLUMN.getValue());
|
||||
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUP_NAME_COLUMN, "group");
|
||||
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_NAME_COLUMN, "name");
|
||||
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_VALUE_COLUMN, "value");
|
||||
columnBasedProperties.put(DatabaseParameterProvider.TABLE_NAME, TABLE_NAME);
|
||||
|
||||
nonColumnBasedProperties = new HashMap<>();
|
||||
nonColumnBasedProperties.put(DatabaseParameterProvider.DBCP_SERVICE, DBCP_SERVICE);
|
||||
nonColumnBasedProperties.put(DatabaseParameterProvider.DB_TYPE, "Generic");
|
||||
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUPING_STRATEGY, DatabaseParameterProvider.GROUPING_BY_TABLE_NAME.getValue());
|
||||
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_NAME_COLUMN, "name");
|
||||
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_VALUE_COLUMN, "value");
|
||||
nonColumnBasedProperties.put(DatabaseParameterProvider.TABLE_NAMES, "KAFKA, S3");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testColumnStrategies() throws SQLException {
|
||||
runColumnStrategiesTest(columnBasedProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testColumnStrategiesWithExtraProperties() throws SQLException {
|
||||
// Ensure setting the unrelated properties don't break anything
|
||||
columnBasedProperties.put(DatabaseParameterProvider.TABLE_NAMES, "a,b");
|
||||
runColumnStrategiesTest(columnBasedProperties);
|
||||
}
|
||||
|
||||
private void runColumnStrategiesTest(final Map<PropertyDescriptor, String> properties) throws SQLException {
|
||||
final List<Map<String, String>> rows = Arrays.asList(
|
||||
new HashMap<String, String>() { {
|
||||
put("group", "Kafka"); put("name", "brokers"); put("value", "my-brokers"); put("unrelated_column", "unrelated_value");
|
||||
} },
|
||||
new HashMap<String, String>() { {
|
||||
put("group", "Kafka"); put("name", "topic"); put("value", "my-topic"); put("unrelated_column", "unrelated_value");
|
||||
} },
|
||||
new HashMap<String, String>() { {
|
||||
put("group", "Kafka"); put("name", "password"); put("value", "my-password"); put("unrelated_column", "unrelated_value");
|
||||
} },
|
||||
new HashMap<String, String>() { {
|
||||
put("group", "S3"); put("name", "bucket"); put("value", "my-bucket"); put("unrelated_column", "unrelated_value");
|
||||
} },
|
||||
new HashMap<String, String>() { {
|
||||
put("group", "S3"); put("name", "s3-password"); put("value", "my-s3-password"); put("unrelated_column", "unrelated_value");
|
||||
} }
|
||||
);
|
||||
mockTableResults(new MockTable(TABLE_NAME, rows));
|
||||
|
||||
final ConfigurationContext context = new MockConfigurationContext(properties, initializationContext);
|
||||
final List<ParameterGroup> groups = parameterProvider.fetchParameters(context);
|
||||
assertEquals(2, groups.size());
|
||||
|
||||
for (final ParameterGroup group : groups) {
|
||||
final String groupName = group.getGroupName();
|
||||
if (groupName.equals("S3")) {
|
||||
final Parameter parameter = group.getParameters().iterator().next();
|
||||
assertEquals("bucket", parameter.getDescriptor().getName());
|
||||
assertEquals("my-bucket", parameter.getValue());
|
||||
assertFalse(parameter.getDescriptor().isSensitive());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonColumnStrategies() throws SQLException {
|
||||
runNonColumnStrategyTest(nonColumnBasedProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonColumnStrategiesWithExtraProperties() throws SQLException {
|
||||
nonColumnBasedProperties.put(DatabaseParameterProvider.TABLE_NAME, TABLE_NAME);
|
||||
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUP_NAME_COLUMN, "group");
|
||||
runNonColumnStrategyTest(nonColumnBasedProperties);
|
||||
}
|
||||
|
||||
private void runNonColumnStrategyTest(final Map<PropertyDescriptor, String> properties) throws SQLException {
|
||||
final List<Map<String, String>> kafkaRows = Arrays.asList(
|
||||
new HashMap<String, String>() { {
|
||||
put("name", "nifi_brokers"); put("value", "my-brokers");
|
||||
} },
|
||||
new HashMap<String, String>() { {
|
||||
put("name", "nifi_topic"); put("value", "my-topic");
|
||||
} },
|
||||
new HashMap<String, String>() { {
|
||||
put("name", "unrelated_field"); put("value", "my-value");
|
||||
} },
|
||||
new HashMap<String, String>() { {
|
||||
put("name", "kafka_password"); put("value", "my-password");
|
||||
} },
|
||||
new HashMap<String, String>() { {
|
||||
put("name", "nifi_password"); put("value", "my-nifi-password");
|
||||
} }
|
||||
);
|
||||
final List<Map<String, String>> s3Rows = Arrays.asList(
|
||||
new HashMap<String, String>() { {
|
||||
put("name", "nifi_s3_bucket"); put("value", "my-bucket");
|
||||
} },
|
||||
new HashMap<String, String>() { {
|
||||
put("name", "s3_password"); put("value", "my-password");
|
||||
} },
|
||||
new HashMap<String, String>() { {
|
||||
put("name", "nifi_other_field"); put("value", "my-field");
|
||||
} },
|
||||
new HashMap<String, String>() { {
|
||||
put("name", "other_password"); put("value", "my-password");
|
||||
} }
|
||||
);
|
||||
mockTableResults(new MockTable("KAFKA", kafkaRows), new MockTable("S3", s3Rows));
|
||||
|
||||
final ConfigurationContext context = new MockConfigurationContext(properties, initializationContext);
|
||||
final List<ParameterGroup> groups = parameterProvider.fetchParameters(context);
|
||||
assertEquals(2, groups.size());
|
||||
|
||||
for (final ParameterGroup group : groups) {
|
||||
if (group.getGroupName().equals("KAFKA")) {
|
||||
assertTrue(group.getParameters().stream()
|
||||
.filter(parameter -> parameter.getDescriptor().getName().equals("nifi_brokers"))
|
||||
.anyMatch(parameter -> parameter.getValue().equals("my-brokers")));
|
||||
} else {
|
||||
assertTrue(group.getParameters().stream()
|
||||
.filter(parameter -> parameter.getDescriptor().getName().equals("nifi_s3_bucket"))
|
||||
.anyMatch(parameter -> parameter.getValue().equals("my-bucket")));
|
||||
}
|
||||
}
|
||||
final Set<String> allParameterNames = groups.stream()
|
||||
.flatMap(group -> group.getParameters().stream())
|
||||
.map(parameter -> parameter.getDescriptor().getName())
|
||||
.collect(Collectors.toSet());
|
||||
assertEquals(new HashSet<>(Arrays.asList("nifi_brokers", "nifi_topic", "kafka_password", "nifi_password",
|
||||
"s3_password", "nifi_s3_bucket", "unrelated_field", "nifi_other_field", "other_password")), allParameterNames);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullNameColumn() throws SQLException {
|
||||
mockTableResults(new MockTable(TABLE_NAME,
|
||||
Arrays.asList(new HashMap<String, String>() { { put("name", null); } })));
|
||||
runTestWithExpectedFailure(columnBasedProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullGroupNameColumn() throws SQLException {
|
||||
mockTableResults(new MockTable(TABLE_NAME,
|
||||
Arrays.asList(new HashMap<String, String>() { { put("name", "param"); put("value", "value"); put("group", null); } })));
|
||||
runTestWithExpectedFailure(columnBasedProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullValueColumn() throws SQLException {
|
||||
mockTableResults(new MockTable(TABLE_NAME,
|
||||
Arrays.asList(new HashMap<String, String>() { { put("name", "param"); put("value", null); } })));
|
||||
runTestWithExpectedFailure(columnBasedProperties);
|
||||
}
|
||||
|
||||
public void runTestWithExpectedFailure(final Map<PropertyDescriptor, String> properties) {
|
||||
final ConfigurationContext context = new MockConfigurationContext(properties, initializationContext);
|
||||
assertThrows(IllegalStateException.class, () -> parameterProvider.fetchParameters(context));
|
||||
}
|
||||
|
||||
private void mockTableResults(final MockTable... mockTables) throws SQLException {
|
||||
final Connection connection = mock(Connection.class);
|
||||
when(dbcpService.getConnection(any(Map.class))).thenReturn(connection);
|
||||
|
||||
OngoingStubbing<Statement> statementStubbing = null;
|
||||
for (final MockTable mockTable : mockTables) {
|
||||
final ResultSet resultSet = mock(ResultSet.class);
|
||||
final ResultSetAnswer resultSetAnswer = new ResultSetAnswer(mockTable.rows);
|
||||
when(resultSet.next()).thenAnswer(resultSetAnswer);
|
||||
|
||||
when(resultSet.getString(anyString())).thenAnswer(invocationOnMock -> resultSetAnswer.getValue(invocationOnMock.getArgument(0)));
|
||||
|
||||
final Statement statement = mock(Statement.class);
|
||||
when(statement.executeQuery(ArgumentMatchers.contains(mockTable.tableName))).thenReturn(resultSet);
|
||||
|
||||
if (statementStubbing == null) {
|
||||
statementStubbing = when(connection.createStatement()).thenReturn(statement);
|
||||
} else {
|
||||
statementStubbing = statementStubbing.thenReturn(statement);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class MockTable {
|
||||
private final String tableName;
|
||||
|
||||
private final List<Map<String, String>> rows;
|
||||
|
||||
private MockTable(final String tableName, final List<Map<String, String>> rows) {
|
||||
this.tableName = tableName;
|
||||
this.rows = rows;
|
||||
}
|
||||
}
|
||||
|
||||
private class ResultSetAnswer implements Answer<Boolean> {
|
||||
private final List<java.util.Map<String, String>> rows;
|
||||
|
||||
private Iterator<java.util.Map<String, String>> rowIterator;
|
||||
private java.util.Map<String, String> currentRow;
|
||||
|
||||
private ResultSetAnswer(final List<java.util.Map<String, String>> rows) {
|
||||
this.rows = rows;
|
||||
this.rowIterator = rows.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean answer(final InvocationOnMock invocationOnMock) {
|
||||
final boolean hasNext = rowIterator.hasNext();
|
||||
if (hasNext) {
|
||||
currentRow = rowIterator.next();
|
||||
} else {
|
||||
currentRow = null;
|
||||
}
|
||||
return hasNext;
|
||||
}
|
||||
|
||||
String getValue(final String column) {
|
||||
return currentRow.get(column);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue