From 08ff54f5fb712d01f372fd25a2d904d605e271ba Mon Sep 17 00:00:00 2001 From: Jim Steinebrey Date: Tue, 19 Mar 2024 11:54:38 -0400 Subject: [PATCH] Add auto commit property to QueryDatabaseTable and QueryDatabaseTable processors to allow disabling auto commit so PostgreSQL Fetch Size will work NIFI-1931 Add proper default value for auto commit (false) to PostgreSQLDatabaseAdapter to allow FETCH_SIZE to be honored on reads. NIFI-1931 Added customValidate code to check the auto commit property setting against the db adapter's required auto commit setting and give validation error message if they do not match. NIFI-1931 Added automated test to check the Auto Commit customValidate error message. NIFI-1931 remove clearDefaultValue() because it is not needed since required = false a;ready defaults it to null. This closes #8534 Signed-off-by: Matt Burgess --- .../standard/AbstractQueryDatabaseTable.java | 71 +++++++++++- .../standard/QueryDatabaseTable.java | 1 + .../standard/QueryDatabaseTableRecord.java | 1 + .../standard/db/DatabaseAdapter.java | 13 +++ .../db/impl/PostgreSQLDatabaseAdapter.java | 18 ++++ .../standard/QueryDatabaseTableIT.java | 78 ++++++++++++++ .../standard/QueryDatabaseTableRecordIT.java | 78 ++++++++++++++ .../QueryDatabaseTableRecordTest.java | 97 +++++++++++++++-- .../standard/QueryDatabaseTableTest.java | 101 ++++++++++++++++-- .../impl/TestPostgreSQLDatabaseAdapter.java | 16 +++ 10 files changed, 456 insertions(+), 18 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java index e5fc6745d6..7f7a870fb7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java @@ -90,16 +90,34 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr "TRANSACTION_SERIALIZABLE" ); + private static final String FETCH_SIZE_NAME = "Fetch Size"; + private static final String AUTO_COMMIT_NAME = "Set Auto Commit"; + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() - .name("Fetch Size") + .name(FETCH_SIZE_NAME) .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be " - + "honored and/or exact. If the value specified is zero, then the hint is ignored.") + + "honored and/or exact. If the value specified is zero, then the hint is ignored. " + + "If using PostgreSQL, then '" + AUTO_COMMIT_NAME + "' must be equal to 'false' to cause '" + FETCH_SIZE_NAME + "' to take effect.") .defaultValue("0") .required(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .build(); + public static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder() + .name(AUTO_COMMIT_NAME) + .description("Allows enabling or disabling the auto commit functionality of the DB connection. Default value is 'No value set'. " + + "'No value set' will leave the db connection's auto commit mode unchanged. " + + "For some JDBC drivers such as PostgreSQL driver, it is required to disable the auto commit functionality " + + "to get the '" + FETCH_SIZE_NAME + "' setting to take effect. " + + "When auto commit is enabled, PostgreSQL driver ignores '" + FETCH_SIZE_NAME + "' setting and loads all rows of the result set to memory at once. " + + "This could lead for a large amount of memory usage when executing queries which fetch large data sets. " + + "More Details of this behaviour in PostgreSQL driver can be found in https://jdbc.postgresql.org//documentation/head/query.html.") + .allowableValues("true", "false") + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .required(false) + .build(); + public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() .name("qdbt-max-rows") .displayName("Max Rows Per Flow File") @@ -196,6 +214,23 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr .build()); } + final Boolean propertyAutoCommit = validationContext.getProperty(AUTO_COMMIT).evaluateAttributeExpressions().asBoolean(); + final Integer fetchSize = validationContext.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger(); + final DatabaseAdapter dbAdapter = dbAdapters.get(validationContext.getProperty(DB_TYPE).getValue()); + final Boolean adapterAutoCommit = dbAdapter == null + ? null + : dbAdapter.getAutoCommitForReads(fetchSize).orElse(null); + if (adapterAutoCommit != null && propertyAutoCommit != null + && propertyAutoCommit != adapterAutoCommit ) { + results.add(new ValidationResult.Builder().valid(false) + .subject(AUTO_COMMIT.getDisplayName()) + .input(String.valueOf(propertyAutoCommit)) + .explanation(String.format("'%s' must be set to '%s' because '%s' %s requires it to be '%s'", + AUTO_COMMIT.getDisplayName(), adapterAutoCommit, + dbAdapter.getName(), DB_TYPE.getDisplayName(), adapterAutoCommit)) + .build()); + } + return results; } @@ -304,7 +339,7 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr } } } catch (final Exception e) { - logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectMaxQuery, e}); + logger.error("Unable to execute SQL select query {} due to {}", selectMaxQuery, e); context.yield(); } } @@ -343,6 +378,24 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr if (logger.isDebugEnabled()) { logger.debug("Executing query {}", new Object[] { selectQuery }); } + + final boolean originalAutoCommit = con.getAutoCommit(); + final Boolean propertyAutoCommitValue = context.getProperty(AUTO_COMMIT).evaluateAttributeExpressions().asBoolean(); + // If user sets AUTO_COMMIT property to non-null (i.e. true or false), then the property value overrides the dbAdapter's value + final Boolean setAutoCommitValue = + dbAdapter == null || propertyAutoCommitValue != null + ? propertyAutoCommitValue + : dbAdapter.getAutoCommitForReads(fetchSize).orElse(null); + if (setAutoCommitValue != null && originalAutoCommit != setAutoCommitValue) { + try { + con.setAutoCommit(setAutoCommitValue); + logger.debug("Driver connection changed to setAutoCommit({})", setAutoCommitValue); + } catch (Exception ex) { + logger.debug("Failed to setAutoCommit({}) due to {}: {}", + setAutoCommitValue, ex.getClass().getName(), ex.getMessage()); + } + } + try (final ResultSet resultSet = st.executeQuery(selectQuery)) { int fragmentIndex=0; // Max values will be updated in the state property map by the callback @@ -441,12 +494,22 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr } } catch (final SQLException e) { throw e; + } finally { + if (con.getAutoCommit() != originalAutoCommit) { + try { + con.setAutoCommit(originalAutoCommit); + logger.debug("Driver connection reset to original setAutoCommit({})", originalAutoCommit); + } catch (Exception ex) { + logger.debug("Failed to setAutoCommit({}) due to {}: {}", + originalAutoCommit, ex.getClass().getName(), ex.getMessage()); + } + } } session.transfer(resultSetFlowFiles, REL_SUCCESS); } catch (final ProcessException | SQLException e) { - logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); + logger.error("Unable to execute SQL select query {} due to {}", selectQuery, e); if (!resultSetFlowFiles.isEmpty()) { session.remove(resultSetFlowFiles); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 24123729b8..51fbc41409 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -109,6 +109,7 @@ public class QueryDatabaseTable extends AbstractQueryDatabaseTable { pds.add(INITIAL_LOAD_STRATEGY); pds.add(QUERY_TIMEOUT); pds.add(FETCH_SIZE); + pds.add(AUTO_COMMIT); pds.add(MAX_ROWS_PER_FLOW_FILE); pds.add(OUTPUT_BATCH_SIZE); pds.add(MAX_FRAGMENTS); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java index 5838d7e46c..2004649976 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java @@ -208,6 +208,7 @@ public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable { pds.add(INITIAL_LOAD_STRATEGY); pds.add(QUERY_TIMEOUT); pds.add(FETCH_SIZE); + pds.add(AUTO_COMMIT); pds.add(MAX_ROWS_PER_FLOW_FILE); pds.add(OUTPUT_BATCH_SIZE); pds.add(MAX_FRAGMENTS); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java index ab661998ed..65b43ff8b7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; /** @@ -211,6 +212,18 @@ public interface DatabaseAdapter { return Collections.singletonList(createTableStatement.toString()); } + /** + * Get the auto commit mode to use for reading from this database type. + * Most databases do not care which auto commit mode is used to read. + * For PostgreSQL it can make a difference. + * @param fetchSize The number of rows to retrieve at a time. Value of 0 means retrieve all rows at once. + * @return Optional.empty() if auto commit mode does not matter and can be left as is. + * Return true or false to indicate whether auto commit needs to be true or false for this database. + */ + default Optional getAutoCommitForReads(Integer fetchSize) { + return Optional.empty(); + } + default String getSQLForDataType(int sqlType) { return JDBCType.valueOf(sqlType).getName(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java index 5e48818600..8ba7b64b22 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static java.sql.Types.CHAR; @@ -160,6 +161,23 @@ public class PostgreSQLDatabaseAdapter extends GenericDatabaseAdapter { .toString()); } + /** + * Get the auto commit mode to use for reading from this database type. + * For PostgreSQL databases, auto commit mode must be set to false to cause a fetchSize other than 0 to take effect. + * More Details of this behaviour in PostgreSQL driver can be found in https://jdbc.postgresql.org//documentation/head/query.html.") + * For PostgreSQL, if autocommit is TRUE, then fetch size is treated as 0 which loads all rows of the result set to memory at once. + * @param fetchSize The number of rows to retrieve at a time. Value of 0 means retrieve all rows at once. + * @return Optional.empty() if auto commit mode does not matter and can be left as is. + * Return true or false to indicate whether auto commit needs to be true or false for this database. + */ + @Override + public Optional getAutoCommitForReads(Integer fetchSize) { + if (fetchSize != null && fetchSize != 0) { + return Optional.of(Boolean.FALSE); + } + return Optional.empty(); + } + @Override public String getSQLForDataType(int sqlType) { switch (sqlType) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java new file mode 100644 index 0000000000..602a87e945 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.dbcp.DBCPConnectionPool; +import org.apache.nifi.dbcp.utils.DBCPProperties; +import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter; +import org.apache.nifi.reporting.InitializationException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.PostgreSQLContainer; + +import java.io.IOException; +import java.sql.SQLException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class QueryDatabaseTableIT extends QueryDatabaseTableTest { + private static PostgreSQLContainer postgres; + + @BeforeAll + public static void setupBeforeClass() { + postgres = new PostgreSQLContainer<>("postgres:9.6.12") + .withInitScript("PutDatabaseRecordIT/create-person-table.sql"); + postgres.start(); + } + + @AfterAll + public static void cleanUpAfterClass() { + if (postgres != null) { + postgres.close(); + postgres = null; + } + } + + @Override + public DatabaseAdapter createDatabaseAdapter() { + return new PostgreSQLDatabaseAdapter(); + } + + @Override + public void createDbcpControllerService() throws InitializationException { + final DBCPConnectionPool connectionPool = new DBCPConnectionPool(); + runner.addControllerService("dbcp", connectionPool); + runner.setProperty(connectionPool, DBCPProperties.DATABASE_URL, postgres.getJdbcUrl()); + runner.setProperty(connectionPool, DBCPProperties.DB_USER, postgres.getUsername()); + runner.setProperty(connectionPool, DBCPProperties.DB_PASSWORD, postgres.getPassword()); + runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME, postgres.getDriverClassName()); + runner.enableControllerService(connectionPool); + } + + @Test + public void testAddedRowsAutoCommitTrue() throws SQLException, IOException { + // this test in the base class is not valid for PostgreSQL so check the validation error message. + final AssertionError assertionError = assertThrows(AssertionError.class, super::testAddedRowsAutoCommitTrue); + assertThat(assertionError.getMessage(), equalTo("Processor has 1 validation failures:\n" + + "'Set Auto Commit' validated against 'true' is invalid because 'Set Auto Commit' " + + "must be set to 'false' because 'PostgreSQL' Database Type requires it to be 'false'\n")); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java new file mode 100644 index 0000000000..4a98f0d48d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.dbcp.DBCPConnectionPool; +import org.apache.nifi.dbcp.utils.DBCPProperties; +import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter; +import org.apache.nifi.reporting.InitializationException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.PostgreSQLContainer; + +import java.io.IOException; +import java.sql.SQLException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class QueryDatabaseTableRecordIT extends QueryDatabaseTableRecordTest { + private static PostgreSQLContainer postgres; + + @BeforeAll + public static void setupBeforeClass() { + postgres = new PostgreSQLContainer<>("postgres:9.6.12") + .withInitScript("PutDatabaseRecordIT/create-person-table.sql"); + postgres.start(); + } + + @AfterAll + public static void cleanUpAfterClass() { + if (postgres != null) { + postgres.close(); + postgres = null; + } + } + + @Override + public DatabaseAdapter createDatabaseAdapter() { + return new PostgreSQLDatabaseAdapter(); + } + + @Override + public void createDbcpControllerService() throws InitializationException { + final DBCPConnectionPool connectionPool = new DBCPConnectionPool(); + runner.addControllerService("dbcp", connectionPool); + runner.setProperty(connectionPool, DBCPProperties.DATABASE_URL, postgres.getJdbcUrl()); + runner.setProperty(connectionPool, DBCPProperties.DB_USER, postgres.getUsername()); + runner.setProperty(connectionPool, DBCPProperties.DB_PASSWORD, postgres.getPassword()); + runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME, postgres.getDriverClassName()); + runner.enableControllerService(connectionPool); + } + + @Test + public void testAddedRowsAutoCommitTrue() throws SQLException, IOException { + // this test in the base class is not valid for PostgreSQL so check the validation error message. + final AssertionError assertionError = assertThrows(AssertionError.class, super::testAddedRowsAutoCommitTrue); + assertThat(assertionError.getMessage(), equalTo("Processor has 1 validation failures:\n" + + "'Set Auto Commit' validated against 'true' is invalid because 'Set Auto Commit' " + + "must be set to 'false' because 'PostgreSQL' Database Type requires it to be 'false'\n")); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java index 91b288df16..7e7de06992 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java @@ -71,7 +71,7 @@ public class QueryDatabaseTableRecordTest { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); MockQueryDatabaseTableRecord processor; - private TestRunner runner; + protected TestRunner runner; private final static String DB_LOCATION = "target/db_qdt"; private DatabaseAdapter dbAdapter; private HashMap origDbAdapters; @@ -109,18 +109,25 @@ public class QueryDatabaseTableRecordTest { System.clearProperty("derby.stream.error.file"); } + public DatabaseAdapter createDatabaseAdapter() { + return new GenericDatabaseAdapter(); + } + + public void createDbcpControllerService() throws InitializationException { + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map dbcpProperties = new HashMap<>(); + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + } @BeforeEach public void setup() throws InitializationException, IOException { - final DBCPService dbcp = new DBCPServiceSimpleImpl(); - final Map dbcpProperties = new HashMap<>(); origDbAdapters = new HashMap<>(QueryDatabaseTableRecord.dbAdapters); - dbAdapter = new GenericDatabaseAdapter(); + dbAdapter = createDatabaseAdapter(); QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), dbAdapter); processor = new MockQueryDatabaseTableRecord(); runner = TestRunners.newTestRunner(processor); - runner.addControllerService("dbcp", dbcp, dbcpProperties); - runner.enableControllerService(dbcp); + createDbcpControllerService(); runner.setProperty(QueryDatabaseTableRecord.DBCP_SERVICE, "dbcp"); runner.setProperty(QueryDatabaseTableRecord.DB_TYPE, dbAdapter.getName()); runner.getStateManager().clear(Scope.CLUSTER); @@ -371,6 +378,82 @@ public class QueryDatabaseTableRecordTest { runner.clearTransferState(); } + @Test + public void testAddedRowsAutoCommitTrue() throws SQLException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2"); + runner.setProperty(QueryDatabaseTableRecord.FETCH_SIZE, "2"); + runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "true"); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME)); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + flowFile.assertAttributeEquals("record.count", "2"); + + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + flowFile.assertAttributeEquals("record.count", "1"); + } + + @Test + public void testAddedRowsAutoCommitFalse() throws SQLException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2"); + runner.setProperty(QueryDatabaseTableRecord.FETCH_SIZE, "2"); + runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "false"); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME)); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + flowFile.assertAttributeEquals("record.count", "2"); + + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + flowFile.assertAttributeEquals("record.count", "1"); + } + @Test public void testAddedRowsTwoTables() throws SQLException { @@ -1415,7 +1498,7 @@ public class QueryDatabaseTableRecordTest { } @Stateful(scopes = Scope.CLUSTER, description = "Mock for QueryDatabaseTableRecord processor") - private static class MockQueryDatabaseTableRecord extends QueryDatabaseTableRecord { + protected static class MockQueryDatabaseTableRecord extends QueryDatabaseTableRecord { void putColumnType(String colName, Integer colType) { columnTypeMap.put(colName, colType); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 17ce74bebb..8f360eeb50 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -74,7 +74,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class QueryDatabaseTableTest { MockQueryDatabaseTable processor; - private TestRunner runner; + protected TestRunner runner; private final static String DB_LOCATION = "target/db_qdt"; private DatabaseAdapter dbAdapter; private HashMap origDbAdapters; @@ -113,18 +113,25 @@ public class QueryDatabaseTableTest { System.clearProperty("derby.stream.error.file"); } + public DatabaseAdapter createDatabaseAdapter() { + return new GenericDatabaseAdapter(); + } + + public void createDbcpControllerService() throws InitializationException { + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map dbcpProperties = new HashMap<>(); + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + } @BeforeEach public void setup() throws InitializationException, IOException { - final DBCPService dbcp = new DBCPServiceSimpleImpl(); - final Map dbcpProperties = new HashMap<>(); origDbAdapters = new HashMap<>(QueryDatabaseTable.dbAdapters); - dbAdapter = new GenericDatabaseAdapter(); + dbAdapter = createDatabaseAdapter(); QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter); processor = new MockQueryDatabaseTable(); runner = TestRunners.newTestRunner(processor); - runner.addControllerService("dbcp", dbcp, dbcpProperties); - runner.enableControllerService(dbcp); + createDbcpControllerService(); runner.setProperty(QueryDatabaseTable.DBCP_SERVICE, "dbcp"); runner.setProperty(QueryDatabaseTable.DB_TYPE, dbAdapter.getName()); runner.getStateManager().clear(Scope.CLUSTER); @@ -373,6 +380,86 @@ public class QueryDatabaseTableTest { runner.clearTransferState(); } + @Test + public void testAddedRowsAutoCommitTrue() throws SQLException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "2"); + runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2"); + runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "true"); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME)); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); + assertEquals(2, getNumberOfRecordsFromStream(in)); + + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + in = new ByteArrayInputStream(flowFile.toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + } + + @Test + public void testAddedRowsAutoCommitFalse() throws SQLException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "2"); + runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2"); + runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "false"); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME)); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); + assertEquals(2, getNumberOfRecordsFromStream(in)); + + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + in = new ByteArrayInputStream(flowFile.toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + } + @Test public void testAddedRowsTwoTables() throws ClassNotFoundException, SQLException, InitializationException, IOException { @@ -1461,7 +1548,7 @@ public class QueryDatabaseTableTest { } @Stateful(scopes = Scope.CLUSTER, description = "Mock for QueryDatabaseTable processor") - private static class MockQueryDatabaseTable extends QueryDatabaseTable { + protected static class MockQueryDatabaseTable extends QueryDatabaseTable { void putColumnType(String colName, Integer colType) { columnTypeMap.put(colName, colType); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java index ea10621867..6d866ef90f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -41,6 +42,21 @@ public class TestPostgreSQLDatabaseAdapter { assertTrue(testSubject.supportsUpsert(), testSubject.getClass().getSimpleName() + " should support upsert"); } + @Test + public void getAutoCommitForReadsFetchSizeNull() { + assertEquals(Optional.empty(), testSubject.getAutoCommitForReads(null)); + } + + @Test + public void getAutoCommitForReadsFetchSizeZero() { + assertEquals(Optional.empty(), testSubject.getAutoCommitForReads(0)); + } + + @Test + public void getAutoCommitForReadsFetchSizeNonZero() { + assertEquals(Optional.of(Boolean.FALSE), testSubject.getAutoCommitForReads(1)); + } + @Test public void testGetUpsertStatementWithNullTableName() { testGetUpsertStatement(null, Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank"));