From 094222260c7cb7dd839520abb2dcf3bcec316ea5 Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Sat, 10 Sep 2022 08:40:54 -0400 Subject: [PATCH] NIFI-9402: Adding DatabaseParameterProvider Adding provided scope to api dependency in nifi-standard-parameter-providers module Adding additional documentation, other minor code cleanup Correcting error handling in StandardParameterProviderNode, updating additional details for DatabaseParameterProvider Correcting null columm value handling NIFI-9402: Fixed Checkstyle violation Signed-off-by: Matthew Burgess This closes #6391 --- .../StandardParameterProviderNode.java | 4 +- .../nifi-standard-parameter-providers/pom.xml | 11 + .../parameter/DatabaseParameterProvider.java | 265 +++++++++++++++ ...rg.apache.nifi.parameter.ParameterProvider | 1 + .../additionalDetails.html | 214 +++++++++++++ .../TestDatabaseParameterProvider.java | 303 ++++++++++++++++++ 6 files changed, 796 insertions(+), 2 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/java/org/apache/nifi/parameter/DatabaseParameterProvider.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/docs/org.apache.nifi.parameter.DatabaseParameterProvider/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/test/java/org/apache/nifi/parameter/TestDatabaseParameterProvider.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java index a1651e6376..a35fbea3da 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java @@ -277,8 +277,8 @@ public class StandardParameterProviderNode extends AbstractComponentNode impleme List fetchedParameterGroups; try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), parameterProvider.getClass(), parameterProvider.getIdentifier())) { fetchedParameterGroups = parameterProvider.fetchParameters(configurationContext); - } catch (final IOException e) { - throw new RuntimeException(String.format("Error fetching parameters for %s", this), e); + } catch (final IOException | RuntimeException e) { + throw new IllegalStateException(String.format("Error fetching parameters for %s: %s", this, e.getMessage()), e); } if (fetchedParameterGroups == null || fetchedParameterGroups.isEmpty()) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml index cc5d40a7da..16111bda50 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml @@ -30,6 +30,17 @@ commons-io commons-io + + org.apache.nifi + nifi-dbcp-service-api + ${project.version} + provided + + + org.apache.nifi + nifi-standard-processors + ${project.version} + org.apache.nifi nifi-utils diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/java/org/apache/nifi/parameter/DatabaseParameterProvider.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/java/org/apache/nifi/parameter/DatabaseParameterProvider.java new file mode 100644 index 0000000000..0bb924350a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/java/org/apache/nifi/parameter/DatabaseParameterProvider.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parameter; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.util.StringUtils; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.stream.Collectors; + +@Tags({"database", "dbcp", "sql"}) +@CapabilityDescription("Fetches parameters from database tables") + +public class DatabaseParameterProvider extends AbstractParameterProvider implements VerifiableParameterProvider { + + protected final static Map dbAdapters = new HashMap<>(); + + public static final PropertyDescriptor DB_TYPE; + + static { + // Load the DatabaseAdapters + ArrayList dbAdapterValues = new ArrayList<>(); + ServiceLoader dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class); + dbAdapterLoader.forEach(it -> { + dbAdapters.put(it.getName(), it); + dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(), it.getDescription())); + }); + + DB_TYPE = new PropertyDescriptor.Builder() + .name("db-type") + .displayName("Database Type") + .description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type " + + "should suffice, but some databases (such as Oracle) require custom SQL clauses. ") + .allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()])) + .defaultValue("Generic") + .required(true) + .build(); + } + + static AllowableValue GROUPING_BY_COLUMN = new AllowableValue("grouping-by-column", "Column", + "A single table is partitioned by the 'Parameter Group Name Column'. All rows with the same value in this column will " + + "map to a group of the same name."); + static AllowableValue GROUPING_BY_TABLE_NAME = new AllowableValue("grouping-by-table-name", "Table Name", + "An entire table maps to a Parameter Group. The group name will be the table name."); + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("dbcp-service") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain a connection to the database.") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor PARAMETER_GROUPING_STRATEGY = new PropertyDescriptor.Builder() + .name("parameter-grouping-strategy") + .displayName("Parameter Grouping Strategy") + .description("The strategy used to group parameters.") + .required(true) + .allowableValues(GROUPING_BY_COLUMN, GROUPING_BY_TABLE_NAME) + .defaultValue(GROUPING_BY_COLUMN.getValue()) + .build(); + + public static final PropertyDescriptor TABLE_NAMES = new PropertyDescriptor.Builder() + .name("table-names") + .displayName("Table Names") + .description("A comma-separated list of names of the database tables containing the parameters.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_TABLE_NAME) + .build(); + + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("table-name") + .displayName("Table Name") + .description("The name of the database table containing the parameters.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_COLUMN) + .build(); + + public static final PropertyDescriptor PARAMETER_NAME_COLUMN = new PropertyDescriptor.Builder() + .name("parameter-name-column") + .displayName("Parameter Name Column") + .description("The name of a column containing the parameter name.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PARAMETER_VALUE_COLUMN = new PropertyDescriptor.Builder() + .name("parameter-value-column") + .displayName("Parameter Value Column") + .description("The name of a column containing the parameter value.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + public static final PropertyDescriptor PARAMETER_GROUP_NAME_COLUMN = new PropertyDescriptor.Builder() + .name("parameter-group-name-column") + .displayName("Parameter Group Name Column") + .description("The name of a column containing the name of the parameter group into which the parameter should be mapped.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_COLUMN) + .build(); + + public static final PropertyDescriptor SQL_WHERE_CLAUSE = new PropertyDescriptor.Builder() + .name("sql-where-clause") + .displayName("SQL WHERE clause") + .description("A optional SQL query 'WHERE' clause by which to filter all results. The 'WHERE' keyword should not be included.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private List properties; + + @Override + protected void init(final ParameterProviderInitializationContext config) { + final List properties = new ArrayList<>(); + properties.add(DB_TYPE); + properties.add(DBCP_SERVICE); + properties.add(PARAMETER_GROUPING_STRATEGY); + properties.add(TABLE_NAME); + properties.add(TABLE_NAMES); + properties.add(PARAMETER_NAME_COLUMN); + properties.add(PARAMETER_VALUE_COLUMN); + properties.add(PARAMETER_GROUP_NAME_COLUMN); + properties.add(SQL_WHERE_CLAUSE); + + this.properties = Collections.unmodifiableList(properties); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public List fetchParameters(final ConfigurationContext context) { + final boolean groupByColumn = GROUPING_BY_COLUMN.getValue().equals(context.getProperty(PARAMETER_GROUPING_STRATEGY).getValue()); + + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String whereClause = context.getProperty(SQL_WHERE_CLAUSE).getValue(); + final String parameterNameColumn = context.getProperty(PARAMETER_NAME_COLUMN).getValue(); + final String parameterValueColumn = context.getProperty(PARAMETER_VALUE_COLUMN).getValue(); + final String parameterGroupNameColumn = context.getProperty(PARAMETER_GROUP_NAME_COLUMN).getValue(); + + final List tableNames = groupByColumn + ? Collections.singletonList(context.getProperty(TABLE_NAME).getValue()) + : Arrays.stream(context.getProperty(TABLE_NAMES).getValue().split(",")).map(String::trim).collect(Collectors.toList()); + + final Map> parameterMap = new HashMap<>(); + for (final String tableName : tableNames) { + try (final Connection con = dbcpService.getConnection(Collections.emptyMap()); final Statement st = con.createStatement()) { + final List columns = new ArrayList<>(); + columns.add(parameterNameColumn); + columns.add(parameterValueColumn); + if (groupByColumn) { + columns.add(parameterGroupNameColumn); + } + final String query = getQuery(context, tableName, columns, whereClause); + + getLogger().info("Fetching parameters with query: " + query); + try (final ResultSet rs = st.executeQuery(query)) { + while (rs.next()) { + final String parameterName = rs.getString(parameterNameColumn); + final String parameterValue = rs.getString(parameterValueColumn); + + validateValueNotNull(parameterName, parameterNameColumn); + validateValueNotNull(parameterValue, parameterValueColumn); + final String parameterGroupName; + if (groupByColumn) { + parameterGroupName = parameterGroupNameColumn == null ? null : rs.getString(parameterGroupNameColumn); + validateValueNotNull(parameterGroupName, parameterGroupNameColumn); + } else { + parameterGroupName = tableName; + } + + final ParameterDescriptor parameterDescriptor = new ParameterDescriptor.Builder() + .name(parameterName) + .build(); + final Parameter parameter = new Parameter(parameterDescriptor, parameterValue); + + parameterMap.computeIfAbsent(parameterGroupName, key -> new ArrayList<>()).add(parameter); + } + } + } catch (final SQLException e) { + getLogger().error("Encountered a database error when fetching parameters: {}", e.getMessage(), e); + throw new RuntimeException("Encountered a database error when fetching parameters: " + e.getMessage(), e); + } + } + + return parameterMap.entrySet().stream() + .map(entry -> new ParameterGroup(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); + } + + private void validateValueNotNull(final String value, final String columnName) { + if (value == null) { + throw new IllegalStateException(String.format("Expected %s column to be non-null", columnName)); + } + } + + String getQuery(final ConfigurationContext context, final String tableName, final List columns, final String whereClause) { + final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); + return dbAdapter.getSelectStatement(tableName, StringUtils.join(columns, ", "), whereClause, null, null, null); + } + + @Override + public List verify(final ConfigurationContext context, final ComponentLog verificationLogger) { + final List results = new ArrayList<>(); + try { + final List parameterGroups = fetchParameters(context); + final long parameterCount = parameterGroups.stream() + .flatMap(group -> group.getParameters().stream()) + .count(); + results.add(new ConfigVerificationResult.Builder() + .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) + .verificationStepName("Fetch Parameters") + .explanation(String.format("Successfully fetched %s Parameter Groups containing %s Parameters matching the filter.", parameterGroups.size(), + parameterCount)) + .build()); + } catch (final Exception e) { + verificationLogger.error("Failed to fetch Parameter Groups", e); + results.add(new ConfigVerificationResult.Builder() + .outcome(ConfigVerificationResult.Outcome.FAILED) + .verificationStepName("Fetch Parameters") + .explanation(String.format("Failed to parameters: " + e.getMessage())) + .build()); + } + + return results; + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider index 77ce897ba2..3b17ef9236 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider @@ -14,3 +14,4 @@ # limitations under the License. org.apache.nifi.parameter.EnvironmentVariableParameterProvider org.apache.nifi.parameter.FileParameterProvider +org.apache.nifi.parameter.DatabaseParameterProvider diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/docs/org.apache.nifi.parameter.DatabaseParameterProvider/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/docs/org.apache.nifi.parameter.DatabaseParameterProvider/additionalDetails.html new file mode 100644 index 0000000000..b349f8aa7e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/docs/org.apache.nifi.parameter.DatabaseParameterProvider/additionalDetails.html @@ -0,0 +1,214 @@ + + + + + + + DatabaseParameterProvider + + + + +

Providing Parameters from a Database

+ +

+ The DatabaseParameterProvider at its core maps database rows to Parameters, specified by a + Parameter Name Column and Parameter Value Column. The Parameter Group name must also be accounted for, and may + be specified in different ways using the Parameter Grouping Strategy. +

+ +

+ Before discussing the actual configuration, note that in some databases, the words 'PARAMETER', 'PARAMETERS', 'GROUP', + and even 'VALUE' are reserved words. If you choose a column name that is a reserved word in the database you are using, + make sure to quote it per the database documentation. +

+ +

+ Also note that you should use the preferred table name and column name case for your database. For example, Postgres + prefers lowercase table and column names, while Oracle prefers capitalized ones. Choosing the appropriate case can + avoid unexpected issues in configuring your DatabaseParameterProvider. +

+ +

+ The default configuration uses a fully column-based approach, with the Parameter Group Name + also specified by columns in the same table. An example of a table using this configuration would be: +

+ + + + + + + + + + + + + + + + + + + + + + + + +
PARAMETER_CONTEXTS
PARAMETER_NAMEPARAMETER_VALUEPARAMETER_GROUP
param.foovalue-foogroup_1
param.barvalue-bargroup_1
param.onevalue-onegroup_2
param.twovalue-twogroup_2
Table 1: Database table example with Grouping Strategy = Column
+ +

+ In order to use the data from this table, set the following Properties: +

+ +
    +
  • Parameter Grouping Strategy - Column
  • +
  • Table Name - PARAMETER_CONTEXTS
  • +
  • Parameter Name Column - PARAMETER_NAME
  • +
  • Parameter Value Column - PARAMETER_VALUE
  • +
  • Parameter Group Name Column - PARAMETER_GROUP
  • +
+ +

+ Once fetched, the parameters in this example will look like this: +

+ +

+ Parameter Group group_1: +

    +
  • param.foo - value-foo
  • +
  • param.bar - value-bar
  • +
+

+ +

+ Parameter Group group_2: +

    +
  • param.one - value-one
  • +
  • param.two - value-two
  • +
+

+ +

Grouping Strategy

+ +

+ The default Grouping Strategy is by Column, which allows you to specify the parameter Group name explicitly in the Parameter Group Column. + Note that if the value in this column is NULL, an exception will be thrown. +

+ +

+ The other Grouping Strategy is by Table, which maps each table to a Parameter Group and sets the Parameter Group Name to the table name. + In this Grouping Strategy, the Parameter Group Column is not used. An example configuration using this strategy would be: +

+ +
    +
  • Parameter Grouping Strategy - Table
  • +
  • Table Names - KAFKA, S3
  • +
  • Parameter Name Column - PARAMETER_NAME
  • +
  • Parameter Value Column - PARAMETER_VALUE
  • +
+ +

+ An example of some tables that may be used with this strategy: +

+ + + + + + + + + + + + + + + + + + + + + + +
KAFKA
PARAMETER_NAMEPARAMETER_VALUE
brokershttp://localhost:9092
topicmy-topic
passwordmy-password
Table 2: 'KAFKA' Database table example with Grouping Strategy = Table
+ + + + + + + + + + + + + + + + + + + +
S3
PARAMETER_NAMEPARAMETER_VALUE
bucketmy-bucket
secret.access.keymy-key
Table 3: 'S3' Database table example with Grouping Strategy = Table
+ +

+ Once fetched, the parameters in this example will look like this: +

+ +

+ Parameter Group KAFKA: +

    +
  • brokers - http://localhost:9092
  • +
  • topic - my-topic
  • +
  • password - my-password
  • +
+

+ +

+ Parameter Group S3: +

    +
  • bucket - my-bucket
  • +
  • secret.access.key - my-key
  • +
+

+ +

Filtering rows

+ +

+ If you need to include only some rows in a table as parameters, you can use the 'SQL WHERE clause' property. An example of this is as follows: +

+ +
    +
  • Parameter Grouping Strategy - Table
  • +
  • Table Names - KAFKA, S3
  • +
  • Parameter Name Column - PARAMETER_NAME
  • +
  • Parameter Value Column - PARAMETER_VALUE
  • +
  • SQL WHERE clause - OTHER_COLUMN = 'my-parameters'
  • +
+ +

+ Here we are assuming there is another column, 'OTHER_COLUMN' in both the KAFKA and S3 tables. Only rows whose 'OTHER_COLUMN' value is 'my-parameters' + will then be fetched from these tables. +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/test/java/org/apache/nifi/parameter/TestDatabaseParameterProvider.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/test/java/org/apache/nifi/parameter/TestDatabaseParameterProvider.java new file mode 100644 index 0000000000..9e6aae4acd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/test/java/org/apache/nifi/parameter/TestDatabaseParameterProvider.java @@ -0,0 +1,303 @@ +package org.apache.nifi.parameter;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockParameterProviderInitializationContext; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.stubbing.OngoingStubbing; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class TestDatabaseParameterProvider { + + public static final String DBCP_SERVICE = "dbcp-service"; + public static final String TABLE_NAME = "myTable"; + private DBCPService dbcpService; + + private DatabaseParameterProvider parameterProvider; + + private MockParameterProviderInitializationContext initializationContext; + + private Map columnBasedProperties; + + private Map nonColumnBasedProperties; + + @Before + public void init() throws InitializationException { + dbcpService = mock(DBCPService.class); + + final DatabaseParameterProvider rawProvider = new DatabaseParameterProvider(); + initializationContext = new MockParameterProviderInitializationContext("id", "name", mock(ComponentLog.class)); + initializationContext.addControllerService(dbcpService, DBCP_SERVICE); + rawProvider.initialize(initializationContext); + parameterProvider = spy(rawProvider); + // Return the table name + doAnswer(invocationOnMock -> invocationOnMock.getArgument(1)).when(parameterProvider).getQuery(any(), any(), any(), any()); + + columnBasedProperties = new HashMap<>(); + + columnBasedProperties.put(DatabaseParameterProvider.DBCP_SERVICE, DBCP_SERVICE); + columnBasedProperties.put(DatabaseParameterProvider.DB_TYPE, "Generic"); + columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUPING_STRATEGY, DatabaseParameterProvider.GROUPING_BY_COLUMN.getValue()); + columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUP_NAME_COLUMN, "group"); + columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_NAME_COLUMN, "name"); + columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_VALUE_COLUMN, "value"); + columnBasedProperties.put(DatabaseParameterProvider.TABLE_NAME, TABLE_NAME); + + nonColumnBasedProperties = new HashMap<>(); + nonColumnBasedProperties.put(DatabaseParameterProvider.DBCP_SERVICE, DBCP_SERVICE); + nonColumnBasedProperties.put(DatabaseParameterProvider.DB_TYPE, "Generic"); + nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUPING_STRATEGY, DatabaseParameterProvider.GROUPING_BY_TABLE_NAME.getValue()); + nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_NAME_COLUMN, "name"); + nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_VALUE_COLUMN, "value"); + nonColumnBasedProperties.put(DatabaseParameterProvider.TABLE_NAMES, "KAFKA, S3"); + } + + @Test + public void testColumnStrategies() throws SQLException { + runColumnStrategiesTest(columnBasedProperties); + } + + @Test + public void testColumnStrategiesWithExtraProperties() throws SQLException { + // Ensure setting the unrelated properties don't break anything + columnBasedProperties.put(DatabaseParameterProvider.TABLE_NAMES, "a,b"); + runColumnStrategiesTest(columnBasedProperties); + } + + private void runColumnStrategiesTest(final Map properties) throws SQLException { + final List> rows = Arrays.asList( + new HashMap() { { + put("group", "Kafka"); put("name", "brokers"); put("value", "my-brokers"); put("unrelated_column", "unrelated_value"); + } }, + new HashMap() { { + put("group", "Kafka"); put("name", "topic"); put("value", "my-topic"); put("unrelated_column", "unrelated_value"); + } }, + new HashMap() { { + put("group", "Kafka"); put("name", "password"); put("value", "my-password"); put("unrelated_column", "unrelated_value"); + } }, + new HashMap() { { + put("group", "S3"); put("name", "bucket"); put("value", "my-bucket"); put("unrelated_column", "unrelated_value"); + } }, + new HashMap() { { + put("group", "S3"); put("name", "s3-password"); put("value", "my-s3-password"); put("unrelated_column", "unrelated_value"); + } } + ); + mockTableResults(new MockTable(TABLE_NAME, rows)); + + final ConfigurationContext context = new MockConfigurationContext(properties, initializationContext); + final List groups = parameterProvider.fetchParameters(context); + assertEquals(2, groups.size()); + + for (final ParameterGroup group : groups) { + final String groupName = group.getGroupName(); + if (groupName.equals("S3")) { + final Parameter parameter = group.getParameters().iterator().next(); + assertEquals("bucket", parameter.getDescriptor().getName()); + assertEquals("my-bucket", parameter.getValue()); + assertFalse(parameter.getDescriptor().isSensitive()); + } + } + } + + @Test + public void testNonColumnStrategies() throws SQLException { + runNonColumnStrategyTest(nonColumnBasedProperties); + } + + @Test + public void testNonColumnStrategiesWithExtraProperties() throws SQLException { + nonColumnBasedProperties.put(DatabaseParameterProvider.TABLE_NAME, TABLE_NAME); + nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUP_NAME_COLUMN, "group"); + runNonColumnStrategyTest(nonColumnBasedProperties); + } + + private void runNonColumnStrategyTest(final Map properties) throws SQLException { + final List> kafkaRows = Arrays.asList( + new HashMap() { { + put("name", "nifi_brokers"); put("value", "my-brokers"); + } }, + new HashMap() { { + put("name", "nifi_topic"); put("value", "my-topic"); + } }, + new HashMap() { { + put("name", "unrelated_field"); put("value", "my-value"); + } }, + new HashMap() { { + put("name", "kafka_password"); put("value", "my-password"); + } }, + new HashMap() { { + put("name", "nifi_password"); put("value", "my-nifi-password"); + } } + ); + final List> s3Rows = Arrays.asList( + new HashMap() { { + put("name", "nifi_s3_bucket"); put("value", "my-bucket"); + } }, + new HashMap() { { + put("name", "s3_password"); put("value", "my-password"); + } }, + new HashMap() { { + put("name", "nifi_other_field"); put("value", "my-field"); + } }, + new HashMap() { { + put("name", "other_password"); put("value", "my-password"); + } } + ); + mockTableResults(new MockTable("KAFKA", kafkaRows), new MockTable("S3", s3Rows)); + + final ConfigurationContext context = new MockConfigurationContext(properties, initializationContext); + final List groups = parameterProvider.fetchParameters(context); + assertEquals(2, groups.size()); + + for (final ParameterGroup group : groups) { + if (group.getGroupName().equals("KAFKA")) { + assertTrue(group.getParameters().stream() + .filter(parameter -> parameter.getDescriptor().getName().equals("nifi_brokers")) + .anyMatch(parameter -> parameter.getValue().equals("my-brokers"))); + } else { + assertTrue(group.getParameters().stream() + .filter(parameter -> parameter.getDescriptor().getName().equals("nifi_s3_bucket")) + .anyMatch(parameter -> parameter.getValue().equals("my-bucket"))); + } + } + final Set allParameterNames = groups.stream() + .flatMap(group -> group.getParameters().stream()) + .map(parameter -> parameter.getDescriptor().getName()) + .collect(Collectors.toSet()); + assertEquals(new HashSet<>(Arrays.asList("nifi_brokers", "nifi_topic", "kafka_password", "nifi_password", + "s3_password", "nifi_s3_bucket", "unrelated_field", "nifi_other_field", "other_password")), allParameterNames); + } + + @Test + public void testNullNameColumn() throws SQLException { + mockTableResults(new MockTable(TABLE_NAME, + Arrays.asList(new HashMap() { { put("name", null); } }))); + runTestWithExpectedFailure(columnBasedProperties); + } + + @Test + public void testNullGroupNameColumn() throws SQLException { + mockTableResults(new MockTable(TABLE_NAME, + Arrays.asList(new HashMap() { { put("name", "param"); put("value", "value"); put("group", null); } }))); + runTestWithExpectedFailure(columnBasedProperties); + } + + @Test + public void testNullValueColumn() throws SQLException { + mockTableResults(new MockTable(TABLE_NAME, + Arrays.asList(new HashMap() { { put("name", "param"); put("value", null); } }))); + runTestWithExpectedFailure(columnBasedProperties); + } + + public void runTestWithExpectedFailure(final Map properties) { + final ConfigurationContext context = new MockConfigurationContext(properties, initializationContext); + assertThrows(IllegalStateException.class, () -> parameterProvider.fetchParameters(context)); + } + + private void mockTableResults(final MockTable... mockTables) throws SQLException { + final Connection connection = mock(Connection.class); + when(dbcpService.getConnection(any(Map.class))).thenReturn(connection); + + OngoingStubbing statementStubbing = null; + for (final MockTable mockTable : mockTables) { + final ResultSet resultSet = mock(ResultSet.class); + final ResultSetAnswer resultSetAnswer = new ResultSetAnswer(mockTable.rows); + when(resultSet.next()).thenAnswer(resultSetAnswer); + + when(resultSet.getString(anyString())).thenAnswer(invocationOnMock -> resultSetAnswer.getValue(invocationOnMock.getArgument(0))); + + final Statement statement = mock(Statement.class); + when(statement.executeQuery(ArgumentMatchers.contains(mockTable.tableName))).thenReturn(resultSet); + + if (statementStubbing == null) { + statementStubbing = when(connection.createStatement()).thenReturn(statement); + } else { + statementStubbing = statementStubbing.thenReturn(statement); + } + } + } + + private class MockTable { + private final String tableName; + + private final List> rows; + + private MockTable(final String tableName, final List> rows) { + this.tableName = tableName; + this.rows = rows; + } + } + + private class ResultSetAnswer implements Answer { + private final List> rows; + + private Iterator> rowIterator; + private java.util.Map currentRow; + + private ResultSetAnswer(final List> rows) { + this.rows = rows; + this.rowIterator = rows.iterator(); + } + + @Override + public Boolean answer(final InvocationOnMock invocationOnMock) { + final boolean hasNext = rowIterator.hasNext(); + if (hasNext) { + currentRow = rowIterator.next(); + } else { + currentRow = null; + } + return hasNext; + } + + String getValue(final String column) { + return currentRow.get(column); + } + } +}