Add auto commit property to QueryDatabaseTable and QueryDatabaseTable processors to allow disabling auto commit so PostgreSQL Fetch Size will work

NIFI-1931 Add proper default value for auto commit (false) to PostgreSQLDatabaseAdapter to allow FETCH_SIZE to be honored on reads.

NIFI-1931 Added customValidate code to check the auto commit property setting against the db adapter's required auto commit setting and give validation error message if they do not match.

NIFI-1931 Added automated test to check the Auto Commit customValidate error message.

NIFI-1931 remove clearDefaultValue() because it is not needed since required = false a;ready defaults it to null.

This closes #8534

Signed-off-by: Matt Burgess <mattyb149@apache.org>
This commit is contained in:
Jim Steinebrey 2024-03-19 11:54:38 -04:00 committed by Matt Burgess
parent 258715539e
commit 08ff54f5fb
10 changed files with 456 additions and 18 deletions

View File

@ -90,16 +90,34 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
"TRANSACTION_SERIALIZABLE"
);
private static final String FETCH_SIZE_NAME = "Fetch Size";
private static final String AUTO_COMMIT_NAME = "Set Auto Commit";
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Fetch Size")
.name(FETCH_SIZE_NAME)
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
+ "honored and/or exact. If the value specified is zero, then the hint is ignored.")
+ "honored and/or exact. If the value specified is zero, then the hint is ignored. "
+ "If using PostgreSQL, then '" + AUTO_COMMIT_NAME + "' must be equal to 'false' to cause '" + FETCH_SIZE_NAME + "' to take effect.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder()
.name(AUTO_COMMIT_NAME)
.description("Allows enabling or disabling the auto commit functionality of the DB connection. Default value is 'No value set'. " +
"'No value set' will leave the db connection's auto commit mode unchanged. " +
"For some JDBC drivers such as PostgreSQL driver, it is required to disable the auto commit functionality " +
"to get the '" + FETCH_SIZE_NAME + "' setting to take effect. " +
"When auto commit is enabled, PostgreSQL driver ignores '" + FETCH_SIZE_NAME + "' setting and loads all rows of the result set to memory at once. " +
"This could lead for a large amount of memory usage when executing queries which fetch large data sets. " +
"More Details of this behaviour in PostgreSQL driver can be found in https://jdbc.postgresql.org//documentation/head/query.html.")
.allowableValues("true", "false")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false)
.build();
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
.name("qdbt-max-rows")
.displayName("Max Rows Per Flow File")
@ -196,6 +214,23 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
.build());
}
final Boolean propertyAutoCommit = validationContext.getProperty(AUTO_COMMIT).evaluateAttributeExpressions().asBoolean();
final Integer fetchSize = validationContext.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
final DatabaseAdapter dbAdapter = dbAdapters.get(validationContext.getProperty(DB_TYPE).getValue());
final Boolean adapterAutoCommit = dbAdapter == null
? null
: dbAdapter.getAutoCommitForReads(fetchSize).orElse(null);
if (adapterAutoCommit != null && propertyAutoCommit != null
&& propertyAutoCommit != adapterAutoCommit ) {
results.add(new ValidationResult.Builder().valid(false)
.subject(AUTO_COMMIT.getDisplayName())
.input(String.valueOf(propertyAutoCommit))
.explanation(String.format("'%s' must be set to '%s' because '%s' %s requires it to be '%s'",
AUTO_COMMIT.getDisplayName(), adapterAutoCommit,
dbAdapter.getName(), DB_TYPE.getDisplayName(), adapterAutoCommit))
.build());
}
return results;
}
@ -304,7 +339,7 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
}
}
} catch (final Exception e) {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectMaxQuery, e});
logger.error("Unable to execute SQL select query {} due to {}", selectMaxQuery, e);
context.yield();
}
}
@ -343,6 +378,24 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
if (logger.isDebugEnabled()) {
logger.debug("Executing query {}", new Object[] { selectQuery });
}
final boolean originalAutoCommit = con.getAutoCommit();
final Boolean propertyAutoCommitValue = context.getProperty(AUTO_COMMIT).evaluateAttributeExpressions().asBoolean();
// If user sets AUTO_COMMIT property to non-null (i.e. true or false), then the property value overrides the dbAdapter's value
final Boolean setAutoCommitValue =
dbAdapter == null || propertyAutoCommitValue != null
? propertyAutoCommitValue
: dbAdapter.getAutoCommitForReads(fetchSize).orElse(null);
if (setAutoCommitValue != null && originalAutoCommit != setAutoCommitValue) {
try {
con.setAutoCommit(setAutoCommitValue);
logger.debug("Driver connection changed to setAutoCommit({})", setAutoCommitValue);
} catch (Exception ex) {
logger.debug("Failed to setAutoCommit({}) due to {}: {}",
setAutoCommitValue, ex.getClass().getName(), ex.getMessage());
}
}
try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
int fragmentIndex=0;
// Max values will be updated in the state property map by the callback
@ -441,12 +494,22 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
}
} catch (final SQLException e) {
throw e;
} finally {
if (con.getAutoCommit() != originalAutoCommit) {
try {
con.setAutoCommit(originalAutoCommit);
logger.debug("Driver connection reset to original setAutoCommit({})", originalAutoCommit);
} catch (Exception ex) {
logger.debug("Failed to setAutoCommit({}) due to {}: {}",
originalAutoCommit, ex.getClass().getName(), ex.getMessage());
}
}
}
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
logger.error("Unable to execute SQL select query {} due to {}", selectQuery, e);
if (!resultSetFlowFiles.isEmpty()) {
session.remove(resultSetFlowFiles);
}

View File

@ -109,6 +109,7 @@ public class QueryDatabaseTable extends AbstractQueryDatabaseTable {
pds.add(INITIAL_LOAD_STRATEGY);
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
pds.add(AUTO_COMMIT);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(OUTPUT_BATCH_SIZE);
pds.add(MAX_FRAGMENTS);

View File

@ -208,6 +208,7 @@ public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable {
pds.add(INITIAL_LOAD_STRATEGY);
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
pds.add(AUTO_COMMIT);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(OUTPUT_BATCH_SIZE);
pds.add(MAX_FRAGMENTS);

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
@ -211,6 +212,18 @@ public interface DatabaseAdapter {
return Collections.singletonList(createTableStatement.toString());
}
/**
* Get the auto commit mode to use for reading from this database type.
* Most databases do not care which auto commit mode is used to read.
* For PostgreSQL it can make a difference.
* @param fetchSize The number of rows to retrieve at a time. Value of 0 means retrieve all rows at once.
* @return Optional.empty() if auto commit mode does not matter and can be left as is.
* Return true or false to indicate whether auto commit needs to be true or false for this database.
*/
default Optional<Boolean> getAutoCommitForReads(Integer fetchSize) {
return Optional.empty();
}
default String getSQLForDataType(int sqlType) {
return JDBCType.valueOf(sqlType).getName();
}

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static java.sql.Types.CHAR;
@ -160,6 +161,23 @@ public class PostgreSQLDatabaseAdapter extends GenericDatabaseAdapter {
.toString());
}
/**
* Get the auto commit mode to use for reading from this database type.
* For PostgreSQL databases, auto commit mode must be set to false to cause a fetchSize other than 0 to take effect.
* More Details of this behaviour in PostgreSQL driver can be found in https://jdbc.postgresql.org//documentation/head/query.html.")
* For PostgreSQL, if autocommit is TRUE, then fetch size is treated as 0 which loads all rows of the result set to memory at once.
* @param fetchSize The number of rows to retrieve at a time. Value of 0 means retrieve all rows at once.
* @return Optional.empty() if auto commit mode does not matter and can be left as is.
* Return true or false to indicate whether auto commit needs to be true or false for this database.
*/
@Override
public Optional<Boolean> getAutoCommitForReads(Integer fetchSize) {
if (fetchSize != null && fetchSize != 0) {
return Optional.of(Boolean.FALSE);
}
return Optional.empty();
}
@Override
public String getSQLForDataType(int sqlType) {
switch (sqlType) {

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.dbcp.DBCPConnectionPool;
import org.apache.nifi.dbcp.utils.DBCPProperties;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
import java.io.IOException;
import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class QueryDatabaseTableIT extends QueryDatabaseTableTest {
private static PostgreSQLContainer<?> postgres;
@BeforeAll
public static void setupBeforeClass() {
postgres = new PostgreSQLContainer<>("postgres:9.6.12")
.withInitScript("PutDatabaseRecordIT/create-person-table.sql");
postgres.start();
}
@AfterAll
public static void cleanUpAfterClass() {
if (postgres != null) {
postgres.close();
postgres = null;
}
}
@Override
public DatabaseAdapter createDatabaseAdapter() {
return new PostgreSQLDatabaseAdapter();
}
@Override
public void createDbcpControllerService() throws InitializationException {
final DBCPConnectionPool connectionPool = new DBCPConnectionPool();
runner.addControllerService("dbcp", connectionPool);
runner.setProperty(connectionPool, DBCPProperties.DATABASE_URL, postgres.getJdbcUrl());
runner.setProperty(connectionPool, DBCPProperties.DB_USER, postgres.getUsername());
runner.setProperty(connectionPool, DBCPProperties.DB_PASSWORD, postgres.getPassword());
runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME, postgres.getDriverClassName());
runner.enableControllerService(connectionPool);
}
@Test
public void testAddedRowsAutoCommitTrue() throws SQLException, IOException {
// this test in the base class is not valid for PostgreSQL so check the validation error message.
final AssertionError assertionError = assertThrows(AssertionError.class, super::testAddedRowsAutoCommitTrue);
assertThat(assertionError.getMessage(), equalTo("Processor has 1 validation failures:\n" +
"'Set Auto Commit' validated against 'true' is invalid because 'Set Auto Commit' " +
"must be set to 'false' because 'PostgreSQL' Database Type requires it to be 'false'\n"));
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.dbcp.DBCPConnectionPool;
import org.apache.nifi.dbcp.utils.DBCPProperties;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
import java.io.IOException;
import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class QueryDatabaseTableRecordIT extends QueryDatabaseTableRecordTest {
private static PostgreSQLContainer<?> postgres;
@BeforeAll
public static void setupBeforeClass() {
postgres = new PostgreSQLContainer<>("postgres:9.6.12")
.withInitScript("PutDatabaseRecordIT/create-person-table.sql");
postgres.start();
}
@AfterAll
public static void cleanUpAfterClass() {
if (postgres != null) {
postgres.close();
postgres = null;
}
}
@Override
public DatabaseAdapter createDatabaseAdapter() {
return new PostgreSQLDatabaseAdapter();
}
@Override
public void createDbcpControllerService() throws InitializationException {
final DBCPConnectionPool connectionPool = new DBCPConnectionPool();
runner.addControllerService("dbcp", connectionPool);
runner.setProperty(connectionPool, DBCPProperties.DATABASE_URL, postgres.getJdbcUrl());
runner.setProperty(connectionPool, DBCPProperties.DB_USER, postgres.getUsername());
runner.setProperty(connectionPool, DBCPProperties.DB_PASSWORD, postgres.getPassword());
runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME, postgres.getDriverClassName());
runner.enableControllerService(connectionPool);
}
@Test
public void testAddedRowsAutoCommitTrue() throws SQLException, IOException {
// this test in the base class is not valid for PostgreSQL so check the validation error message.
final AssertionError assertionError = assertThrows(AssertionError.class, super::testAddedRowsAutoCommitTrue);
assertThat(assertionError.getMessage(), equalTo("Processor has 1 validation failures:\n" +
"'Set Auto Commit' validated against 'true' is invalid because 'Set Auto Commit' " +
"must be set to 'false' because 'PostgreSQL' Database Type requires it to be 'false'\n"));
}
}

View File

@ -71,7 +71,7 @@ public class QueryDatabaseTableRecordTest {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
MockQueryDatabaseTableRecord processor;
private TestRunner runner;
protected TestRunner runner;
private final static String DB_LOCATION = "target/db_qdt";
private DatabaseAdapter dbAdapter;
private HashMap<String, DatabaseAdapter> origDbAdapters;
@ -109,18 +109,25 @@ public class QueryDatabaseTableRecordTest {
System.clearProperty("derby.stream.error.file");
}
public DatabaseAdapter createDatabaseAdapter() {
return new GenericDatabaseAdapter();
}
public void createDbcpControllerService() throws InitializationException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
}
@BeforeEach
public void setup() throws InitializationException, IOException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
origDbAdapters = new HashMap<>(QueryDatabaseTableRecord.dbAdapters);
dbAdapter = new GenericDatabaseAdapter();
dbAdapter = createDatabaseAdapter();
QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), dbAdapter);
processor = new MockQueryDatabaseTableRecord();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
createDbcpControllerService();
runner.setProperty(QueryDatabaseTableRecord.DBCP_SERVICE, "dbcp");
runner.setProperty(QueryDatabaseTableRecord.DB_TYPE, dbAdapter.getName());
runner.getStateManager().clear(Scope.CLUSTER);
@ -371,6 +378,82 @@ public class QueryDatabaseTableRecordTest {
runner.clearTransferState();
}
@Test
public void testAddedRowsAutoCommitTrue() throws SQLException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
runner.setProperty(QueryDatabaseTableRecord.FETCH_SIZE, "2");
runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "true");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
flowFile.assertAttributeEquals("record.count", "2");
flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
flowFile.assertAttributeEquals("record.count", "1");
}
@Test
public void testAddedRowsAutoCommitFalse() throws SQLException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
runner.setProperty(QueryDatabaseTableRecord.FETCH_SIZE, "2");
runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "false");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
flowFile.assertAttributeEquals("record.count", "2");
flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
flowFile.assertAttributeEquals("record.count", "1");
}
@Test
public void testAddedRowsTwoTables() throws SQLException {
@ -1415,7 +1498,7 @@ public class QueryDatabaseTableRecordTest {
}
@Stateful(scopes = Scope.CLUSTER, description = "Mock for QueryDatabaseTableRecord processor")
private static class MockQueryDatabaseTableRecord extends QueryDatabaseTableRecord {
protected static class MockQueryDatabaseTableRecord extends QueryDatabaseTableRecord {
void putColumnType(String colName, Integer colType) {
columnTypeMap.put(colName, colType);
}

View File

@ -74,7 +74,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class QueryDatabaseTableTest {
MockQueryDatabaseTable processor;
private TestRunner runner;
protected TestRunner runner;
private final static String DB_LOCATION = "target/db_qdt";
private DatabaseAdapter dbAdapter;
private HashMap<String, DatabaseAdapter> origDbAdapters;
@ -113,18 +113,25 @@ public class QueryDatabaseTableTest {
System.clearProperty("derby.stream.error.file");
}
public DatabaseAdapter createDatabaseAdapter() {
return new GenericDatabaseAdapter();
}
public void createDbcpControllerService() throws InitializationException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
}
@BeforeEach
public void setup() throws InitializationException, IOException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
origDbAdapters = new HashMap<>(QueryDatabaseTable.dbAdapters);
dbAdapter = new GenericDatabaseAdapter();
dbAdapter = createDatabaseAdapter();
QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter);
processor = new MockQueryDatabaseTable();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
createDbcpControllerService();
runner.setProperty(QueryDatabaseTable.DBCP_SERVICE, "dbcp");
runner.setProperty(QueryDatabaseTable.DB_TYPE, dbAdapter.getName());
runner.getStateManager().clear(Scope.CLUSTER);
@ -373,6 +380,86 @@ public class QueryDatabaseTableTest {
runner.clearTransferState();
}
@Test
public void testAddedRowsAutoCommitTrue() throws SQLException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "2");
runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "true");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
assertEquals(2, getNumberOfRecordsFromStream(in));
flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1);
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
in = new ByteArrayInputStream(flowFile.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
}
@Test
public void testAddedRowsAutoCommitFalse() throws SQLException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "2");
runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "false");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
assertEquals(2, getNumberOfRecordsFromStream(in));
flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1);
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
in = new ByteArrayInputStream(flowFile.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
}
@Test
public void testAddedRowsTwoTables() throws ClassNotFoundException, SQLException, InitializationException, IOException {
@ -1461,7 +1548,7 @@ public class QueryDatabaseTableTest {
}
@Stateful(scopes = Scope.CLUSTER, description = "Mock for QueryDatabaseTable processor")
private static class MockQueryDatabaseTable extends QueryDatabaseTable {
protected static class MockQueryDatabaseTable extends QueryDatabaseTable {
void putColumnType(String colName, Integer colType) {
columnTypeMap.put(colName, colType);
}

View File

@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -41,6 +42,21 @@ public class TestPostgreSQLDatabaseAdapter {
assertTrue(testSubject.supportsUpsert(), testSubject.getClass().getSimpleName() + " should support upsert");
}
@Test
public void getAutoCommitForReadsFetchSizeNull() {
assertEquals(Optional.empty(), testSubject.getAutoCommitForReads(null));
}
@Test
public void getAutoCommitForReadsFetchSizeZero() {
assertEquals(Optional.empty(), testSubject.getAutoCommitForReads(0));
}
@Test
public void getAutoCommitForReadsFetchSizeNonZero() {
assertEquals(Optional.of(Boolean.FALSE), testSubject.getAutoCommitForReads(1));
}
@Test
public void testGetUpsertStatementWithNullTableName() {
testGetUpsertStatement(null, Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank"));