NIFI-12993 Add auto commit feature and add batch processing for the sql stmt type

NIFI-12993 Removed underscore from a few local variables.

NIFI-12993 Refactored unit tests into a single java file

NIFI-12993 Changed Optional.isEmpty() ro !Optional.isPresent() so it can work in Nifi 1.x Java 8

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #8597
This commit is contained in:
Jim Steinebrey 2024-04-02 14:51:34 -04:00 committed by Matt Burgess
parent b5943941ba
commit 1514890371
2 changed files with 562 additions and 155 deletions

View File

@ -27,6 +27,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.components.PropertyDescriptor.Builder;
@ -376,6 +377,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build(); .build();
static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder()
.name("database-session-autocommit")
.displayName("Database Session AutoCommit")
.description("The autocommit mode to set on the database connection being used. If set to false, the operation(s) will be explicitly committed or rolled back "
+ "(based on success or failure respectively). If set to true, the driver/database automatically handles the commit/rollback.")
.allowableValues("true", "false")
.defaultValue("false")
.required(false)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final PropertyDescriptor DB_TYPE; static final PropertyDescriptor DB_TYPE;
protected static final Map<String, DatabaseAdapter> dbAdapters; protected static final Map<String, DatabaseAdapter> dbAdapters;
@ -431,6 +443,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE); pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
pds.add(TABLE_SCHEMA_CACHE_SIZE); pds.add(TABLE_SCHEMA_CACHE_SIZE);
pds.add(MAX_BATCH_SIZE); pds.add(MAX_BATCH_SIZE);
pds.add(AUTO_COMMIT);
propDescriptors = Collections.unmodifiableList(pds); propDescriptors = Collections.unmodifiableList(pds);
} }
@ -466,9 +479,41 @@ public class PutDatabaseRecord extends AbstractProcessor {
); );
} }
final boolean autoCommit = validationContext.getProperty(AUTO_COMMIT).asBoolean();
final boolean rollbackOnFailure = validationContext.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
if (autoCommit && rollbackOnFailure) {
validationResults.add(new ValidationResult.Builder()
.subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName())
.explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'. "
+ "Transaction rollbacks for batch updates cannot be supported when auto commit is set to 'true'",
RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), AUTO_COMMIT.getDisplayName()))
.build());
}
if (autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) {
final String explanation = format("'%s' must be hard-coded to zero when '%s' is set to 'true'."
+ " Batch size equal to zero executes all statements in a single transaction"
+ " which allows automatic rollback to revert all statements if an error occurs",
MAX_BATCH_SIZE.getDisplayName(), AUTO_COMMIT.getDisplayName());
validationResults.add(new ValidationResult.Builder()
.subject(MAX_BATCH_SIZE.getDisplayName())
.explanation(explanation)
.build());
}
return validationResults; return validationResults;
} }
private boolean isMaxBatchSizeHardcodedToZero(ValidationContext validationContext) {
try {
return !validationContext.getProperty(MAX_BATCH_SIZE).isExpressionLanguagePresent()
&& 0 == validationContext.getProperty(MAX_BATCH_SIZE).asInteger();
} catch (Exception ex) {
return false;
}
}
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
databaseAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); databaseAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
@ -490,6 +535,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue); dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue);
} }
@OnUnscheduled
public final void onUnscheduled() {
supportsBatchUpdates = Optional.empty();
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get(); FlowFile flowFile = session.get();
@ -498,31 +548,40 @@ public class PutDatabaseRecord extends AbstractProcessor {
} }
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
Optional<Connection> connectionHolder = Optional.empty();
Connection connection = null;
boolean originalAutoCommit = false; boolean originalAutoCommit = false;
try { try {
final Connection connection = dbcpService.getConnection(flowFile.getAttributes()); connection = dbcpService.getConnection(flowFile.getAttributes());
connectionHolder = Optional.of(connection);
originalAutoCommit = connection.getAutoCommit(); originalAutoCommit = connection.getAutoCommit();
if (originalAutoCommit) { final boolean autoCommit = context.getProperty(AUTO_COMMIT).asBoolean();
if (originalAutoCommit != autoCommit) {
try { try {
connection.setAutoCommit(false); connection.setAutoCommit(autoCommit);
} catch (SQLFeatureNotSupportedException sfnse) { } catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug("setAutoCommit(false) not supported by this driver"); getLogger().debug(String.format("setAutoCommit(%s) not supported by this driver", autoCommit), sfnse);
} }
} }
putToDatabase(context, session, flowFile, connection); putToDatabase(context, session, flowFile, connection);
// Only commit the connection if auto-commit is false // Only commit the connection if auto-commit is false
if (!originalAutoCommit) { if (!connection.getAutoCommit()) {
connection.commit(); connection.commit();
} }
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, getJdbcUrl(connection)); session.getProvenanceReporter().send(flowFile, getJdbcUrl(connection));
} catch (final Exception e) { } catch (final Exception e) {
routeOnException(context, session, connection, e, flowFile);
} finally {
closeConnection(connection, originalAutoCommit);
}
}
private void routeOnException(final ProcessContext context, final ProcessSession session,
Connection connection, Exception e, FlowFile flowFile) {
// When an Exception is thrown, we want to route to 'retry' if we expect that attempting the same request again // When an Exception is thrown, we want to route to 'retry' if we expect that attempting the same request again
// might work. Otherwise, route to failure. SQLTransientException is a specific type that indicates that a retry may work. // might work. Otherwise, route to failure. SQLTransientException is a specific type that indicates that a retry may work.
final Relationship relationship; final Relationship relationship;
@ -544,7 +603,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
session.transfer(flowFile, relationship); session.transfer(flowFile, relationship);
} }
connectionHolder.ifPresent(connection -> { rollbackConnection(connection);
}
private void rollbackConnection(Connection connection) {
if (connection != null) {
try { try {
if (!connection.getAutoCommit()) { if (!connection.getAutoCommit()) {
connection.rollback(); connection.rollback();
@ -552,30 +615,31 @@ public class PutDatabaseRecord extends AbstractProcessor {
} catch (final Exception rollbackException) { } catch (final Exception rollbackException) {
getLogger().error("Failed to rollback JDBC transaction", rollbackException); getLogger().error("Failed to rollback JDBC transaction", rollbackException);
} }
});
} finally {
if (originalAutoCommit) {
connectionHolder.ifPresent(connection -> {
try {
connection.setAutoCommit(true);
} catch (final Exception autoCommitException) {
getLogger().warn("Failed to set auto-commit back to true on connection", autoCommitException);
} }
});
} }
connectionHolder.ifPresent(connection -> { private void closeConnection(Connection connection, boolean originalAutoCommit) {
if (connection != null) {
try { try {
if (originalAutoCommit != connection.getAutoCommit()) {
connection.setAutoCommit(originalAutoCommit);
}
} catch (final Exception autoCommitException) {
getLogger().warn(String.format("Failed to set auto-commit back to %s on connection", originalAutoCommit), autoCommitException);
}
try {
if (!connection.isClosed()) {
connection.close(); connection.close();
}
} catch (final Exception closeException) { } catch (final Exception closeException) {
getLogger().warn("Failed to close database connection", closeException); getLogger().warn("Failed to close database connection", closeException);
} }
});
} }
} }
private void executeSQL(final ProcessContext context, final ProcessSession session, final FlowFile flowFile,
private void executeSQL(final ProcessContext context, final FlowFile flowFile, final Connection connection, final RecordReader recordReader) final Connection connection, final RecordReader recordReader)
throws IllegalArgumentException, MalformedRecordException, IOException, SQLException { throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
final RecordSchema recordSchema = recordReader.getSchema(); final RecordSchema recordSchema = recordReader.getSchema();
@ -602,23 +666,66 @@ public class PutDatabaseRecord extends AbstractProcessor {
} }
} }
Record currentRecord; final ComponentLog log = getLogger();
while ((currentRecord = recordReader.nextRecord()) != null) { final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
// Do not use batch if set to batch size of 1 because that is similar to not using batching.
// Also do not use batches if the connection does not support batching.
boolean useBatch = maxBatchSize != 1 && isSupportBatchUpdates(connection);
int currentBatchSize = 0;
int batchIndex = 0;
boolean isFirstRecord = true;
Record nextRecord = recordReader.nextRecord();
while (nextRecord != null) {
Record currentRecord = nextRecord;
final String sql = currentRecord.getAsString(sqlField); final String sql = currentRecord.getAsString(sqlField);
nextRecord = recordReader.nextRecord();
if (sql == null || StringUtils.isEmpty(sql)) { if (sql == null || StringUtils.isEmpty(sql)) {
throw new MalformedRecordException(format("Record had no (or null) value for Field Containing SQL: %s, FlowFile %s", sqlField, flowFile)); throw new MalformedRecordException(format("Record had no (or null) value for Field Containing SQL: %s, FlowFile %s", sqlField, flowFile));
} }
// Execute the statement(s) as-is final String[] sqlStatements;
if (context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean()) { if (context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean()) {
final String regex = "(?<!\\\\);"; final String regex = "(?<!\\\\);";
final String[] sqlStatements = (sql).split(regex); sqlStatements = (sql).split(regex);
} else {
sqlStatements = new String[] { sql };
}
if (isFirstRecord) {
// If there is only one sql statement to process, then do not use batching.
if (nextRecord == null && sqlStatements.length == 1) {
useBatch = false;
}
isFirstRecord = false;
}
for (String sqlStatement : sqlStatements) { for (String sqlStatement : sqlStatements) {
if (useBatch) {
currentBatchSize++;
statement.addBatch(sqlStatement);
} else {
statement.execute(sqlStatement); statement.execute(sqlStatement);
} }
} else {
statement.execute(sql);
} }
if (useBatch && maxBatchSize > 0 && currentBatchSize >= maxBatchSize) {
batchIndex++;
log.debug("Executing batch with last query {} because batch reached max size %s for {}; batch index: {}; batch size: {}",
sql, maxBatchSize, flowFile, batchIndex, currentBatchSize);
statement.executeBatch();
session.adjustCounter("Batches Executed", 1, false);
currentBatchSize = 0;
}
}
if (useBatch && currentBatchSize > 0) {
batchIndex++;
log.debug("Executing last batch because last statement reached for {}; batch index: {}; batch size: {}",
flowFile, batchIndex, currentBatchSize);
statement.executeBatch();
session.adjustCounter("Batches Executed", 1, false);
} }
} }
} }
@ -730,7 +837,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final List<Integer> fieldIndexes = preparedSqlAndColumns.getSqlAndIncludedColumns().getFieldIndexes(); final List<Integer> fieldIndexes = preparedSqlAndColumns.getSqlAndIncludedColumns().getFieldIndexes();
final String sql = preparedSqlAndColumns.getSqlAndIncludedColumns().getSql(); final String sql = preparedSqlAndColumns.getSqlAndIncludedColumns().getSql();
if (currentBatchSize > 0 && ps != lastPreparedStatement && lastPreparedStatement != null) { if (ps != lastPreparedStatement && lastPreparedStatement != null) {
batchIndex++; batchIndex++;
log.debug("Executing query {} because Statement Type changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: {}", log.debug("Executing query {} because Statement Type changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
sql, flowFile, fieldIndexes, batchIndex, currentBatchSize); sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
@ -1032,7 +1139,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger()); final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
if (SQL_TYPE.equalsIgnoreCase(statementType)) { if (SQL_TYPE.equalsIgnoreCase(statementType)) {
executeSQL(context, flowFile, connection, recordReader); executeSQL(context, session, flowFile, connection, recordReader);
} else { } else {
final DMLSettings settings = new DMLSettings(context); final DMLSettings settings = new DMLSettings(context);
executeDML(context, session, flowFile, connection, recordReader, statementType, settings); executeDML(context, session, flowFile, connection, recordReader, statementType, settings);
@ -1493,6 +1600,28 @@ public class PutDatabaseRecord extends AbstractProcessor {
return normalizedKeyColumnNames; return normalizedKeyColumnNames;
} }
private Optional<Boolean> supportsBatchUpdates = Optional.empty();
private void initializeSupportBatchUpdates(Connection connection) {
if (!supportsBatchUpdates.isPresent()) {
try {
final DatabaseMetaData dmd = connection.getMetaData();
supportsBatchUpdates = Optional.of(dmd.supportsBatchUpdates());
getLogger().debug(String.format("Connection supportsBatchUpdates is %s",
supportsBatchUpdates.orElse(Boolean.FALSE)));
} catch (Exception ex) {
supportsBatchUpdates = Optional.of(Boolean.FALSE);
getLogger().debug(String.format("Exception while testing if connection supportsBatchUpdates due to %s - %s",
ex.getClass().getName(), ex.getMessage()));
}
}
}
private boolean isSupportBatchUpdates(Connection connection) {
initializeSupportBatchUpdates(connection);
return supportsBatchUpdates.orElse(Boolean.FALSE);
}
static class SchemaKey { static class SchemaKey {
private final String catalog; private final String catalog;
private final String schemaName; private final String schemaName;

View File

@ -37,9 +37,11 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.util.file.FileUtils;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers; import org.mockito.ArgumentMatchers;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -68,6 +70,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -86,7 +89,51 @@ import static org.mockito.Mockito.when;
public class PutDatabaseRecordTest { public class PutDatabaseRecordTest {
private static String DBCP_SERVICE_ID = "dbcp"; private enum TestCaseEnum {
// ENABLED means to use that test case in the parameterized tests.
// DISABLED test cases are used for single-run tests which are not parameterized
DEFAULT_0(ENABLED, new TestCase(false, false, 0)),
DEFAULT_1(DISABLED, new TestCase(false, false, 1)),
DEFAULT_2(DISABLED, new TestCase(false, false, 2)),
DEFAULT_1000(DISABLED, new TestCase(false, false, 1000)),
ROLLBACK_0(DISABLED, new TestCase(false, true, 0)),
ROLLBACK_1(DISABLED, new TestCase(false, true, 1)),
ROLLBACK_2(DISABLED, new TestCase(false, true, 2)),
ROLLBACK_1000(ENABLED, new TestCase(false, true, 1000)),
// If autoCommit equals true, then rollbackOnFailure must be false AND batchSize must equal 0
AUTO_COMMIT_0(ENABLED, new TestCase(true, false, 0));
private final boolean enabled;
private final TestCase testCase;
TestCaseEnum(boolean enabled, TestCase t) {
this.enabled = enabled;
this.testCase = t;
}
public boolean isEnabled() {
return enabled;
}
public TestCase getTestCase() {
return testCase;
}
}
static Stream<Arguments> getTestCases() {
return Arrays.stream(TestCaseEnum.values())
.filter(TestCaseEnum::isEnabled)
.map(TestCaseEnum::getTestCase)
.map(Arguments::of);
}
private final static boolean ENABLED = true;
private final static boolean DISABLED = false;
private final static String DBCP_SERVICE_ID = "dbcp";
private static final String CONNECTION_FAILED = "Connection Failed"; private static final String CONNECTION_FAILED = "Connection Failed";
@ -141,8 +188,7 @@ public class PutDatabaseRecordTest {
System.clearProperty("derby.stream.error.file"); System.clearProperty("derby.stream.error.file");
} }
@BeforeEach private void setRunner(TestCase testCase) throws InitializationException {
public void setRunner() throws Exception {
processor = new PutDatabaseRecord(); processor = new PutDatabaseRecord();
//Mock the DBCP Controller Service so we can control the Results //Mock the DBCP Controller Service so we can control the Results
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION)); dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
@ -153,10 +199,15 @@ public class PutDatabaseRecordTest {
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties); runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp); runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID); runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
runner.setProperty(PutDatabaseRecord.AUTO_COMMIT, testCase.getAutoCommitAsString());
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, testCase.getRollbackOnFailureAsString());
runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, testCase.getBatchSizeAsString());
} }
@Test @Test
public void testGetConnectionFailure() throws InitializationException { public void testGetConnectionFailure() throws InitializationException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService(PARSER_ID, parser); runner.addControllerService(PARSER_ID, parser);
runner.enableControllerService(parser); runner.enableControllerService(parser);
@ -175,6 +226,8 @@ public class PutDatabaseRecordTest {
@Test @Test
public void testSetAutoCommitFalseFailure() throws InitializationException, SQLException { public void testSetAutoCommitFalseFailure() throws InitializationException, SQLException {
setRunner(TestCaseEnum.DEFAULT_1.getTestCase());
dbcp = new DBCPServiceAutoCommitTest(DB_LOCATION); dbcp = new DBCPServiceAutoCommitTest(DB_LOCATION);
final Map<String, String> dbcpProperties = new HashMap<>(); final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
@ -211,6 +264,8 @@ public class PutDatabaseRecordTest {
@Test @Test
public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException { public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException {
setRunner(TestCaseEnum.DEFAULT_2.getTestCase());
// Need to override the @Before method with a new processor that behaves badly // Need to override the @Before method with a new processor that behaves badly
processor = new PutDatabaseRecordUnmatchedField(); processor = new PutDatabaseRecordUnmatchedField();
//Mock the DBCP Controller Service so we can control the Results //Mock the DBCP Controller Service so we can control the Results
@ -257,7 +312,8 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testGeneratePreparedStatements() throws SQLException, MalformedRecordException { public void testGeneratePreparedStatements() throws InitializationException, SQLException, MalformedRecordException {
setRunner(TestCaseEnum.DEFAULT_1000.getTestCase());
final List<RecordField> fields = Arrays.asList(new RecordField("id", RecordFieldType.INT.getDataType()), final List<RecordField> fields = Arrays.asList(new RecordField("id", RecordFieldType.INT.getDataType()),
new RecordField("name", RecordFieldType.STRING.getDataType()), new RecordField("name", RecordFieldType.STRING.getDataType()),
@ -295,7 +351,8 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testGeneratePreparedStatementsFailUnmatchedField() { public void testGeneratePreparedStatementsFailUnmatchedField() throws InitializationException {
setRunner(TestCaseEnum.ROLLBACK_0.getTestCase());
final List<RecordField> fields = Arrays.asList(new RecordField("id", RecordFieldType.INT.getDataType()), final List<RecordField> fields = Arrays.asList(new RecordField("id", RecordFieldType.INT.getDataType()),
new RecordField("name", RecordFieldType.STRING.getDataType()), new RecordField("name", RecordFieldType.STRING.getDataType()),
@ -340,8 +397,11 @@ public class PutDatabaseRecordTest {
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage()); assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage());
} }
@Test @ParameterizedTest()
void testInsert() throws InitializationException, ProcessException, SQLException, IOException { @MethodSource("getTestCases")
public void testInsert(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -406,7 +466,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertNonRequiredColumns() throws InitializationException, ProcessException, SQLException { public void testInsertNonRequiredColumns() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.ROLLBACK_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -471,7 +533,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException { public void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -483,7 +547,8 @@ public class PutDatabaseRecordTest {
parser.addRecord(1, "rec1", 101); parser.addRecord(1, "rec1", 101);
parser.addRecord(2, "rec2", 102); parser.addRecord(2, "rec2", 102);
parser.addRecord(3, "rec3", 1000); // This record violates the constraint on the "code" column so should result in FlowFile being routed to failure // This record violates the constraint on the "code" column so should result in FlowFile routing to failure
parser.addRecord(3, "rec3", 1000);
parser.addRecord(4, "rec4", 104); parser.addRecord(4, "rec4", 104);
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
@ -505,7 +570,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertBatchUpdateExceptionRollbackOnFailure() throws InitializationException, ProcessException, SQLException { public void testInsertBatchUpdateExceptionRollbackOnFailure() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.ROLLBACK_1000.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -523,7 +590,6 @@ public class PutDatabaseRecordTest {
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
runner.enqueue(new byte[0]); runner.enqueue(new byte[0]);
runner.run(); runner.run();
@ -539,7 +605,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertNoTableSpecified() throws InitializationException, ProcessException, SQLException { public void testInsertNoTableSpecified() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -563,7 +631,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertNoTableExists() throws InitializationException, ProcessException, SQLException { public void testInsertNoTableExists() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.AUTO_COMMIT_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -587,10 +657,46 @@ public class PutDatabaseRecordTest {
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0); MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0);
final String errorMessage = flowFile.getAttribute("putdatabaserecord.error"); final String errorMessage = flowFile.getAttribute("putdatabaserecord.error");
assertTrue(errorMessage.contains("PERSONS2")); assertTrue(errorMessage.contains("PERSONS2"));
runner.enqueue();
} }
@Test @ParameterizedTest()
void testInsertViaSqlStatementType() throws InitializationException, ProcessException, SQLException { @MethodSource("getTestCases")
public void testInsertViaSqlTypeOneStatement(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
String[] sqlStatements = new String[] {
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
};
testInsertViaSqlTypeStatements(sqlStatements, false);
}
@ParameterizedTest()
@MethodSource("getTestCases")
public void testInsertViaSqlTypeTwoStatementsSemicolon(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
String[] sqlStatements = new String[] {
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
"INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
};
testInsertViaSqlTypeStatements(sqlStatements, true);
}
@ParameterizedTest()
@MethodSource("getTestCases")
public void testInsertViaSqlTypeThreeStatements(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
String[] sqlStatements = new String[] {
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
"INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)",
"UPDATE PERSONS SET code = 101 WHERE id = 1"
};
testInsertViaSqlTypeStatements(sqlStatements, false);
}
void testInsertViaSqlTypeStatements(String[] sqlStatements, boolean allowMultipleStatements) throws InitializationException, ProcessException, SQLException {
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -598,13 +704,15 @@ public class PutDatabaseRecordTest {
parser.addSchemaField("sql", RecordFieldType.STRING); parser.addSchemaField("sql", RecordFieldType.STRING);
parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"); for (String sqlStatement : sqlStatements) {
parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)"); parser.addRecord(sqlStatement);
}
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql"); runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql");
runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, String.valueOf(allowMultipleStatements));
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql"); attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql");
@ -615,22 +723,48 @@ public class PutDatabaseRecordTest {
final Connection conn = dbcp.getConnection(); final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement(); final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
if (sqlStatements.length >= 1) {
assertTrue(rs.next()); assertTrue(rs.next());
assertEquals(1, rs.getInt(1)); assertEquals(1, rs.getInt(1));
assertEquals("rec1", rs.getString(2)); assertEquals("rec1", rs.getString(2));
assertEquals(101, rs.getInt(3)); assertEquals(101, rs.getInt(3));
}
if (sqlStatements.length >= 2) {
assertTrue(rs.next()); assertTrue(rs.next());
assertEquals(2, rs.getInt(1)); assertEquals(2, rs.getInt(1));
assertEquals("rec2", rs.getString(2)); assertEquals("rec2", rs.getString(2));
assertEquals(102, rs.getInt(3)); assertEquals(102, rs.getInt(3));
}
assertFalse(rs.next()); assertFalse(rs.next());
stmt.close(); stmt.close();
conn.close(); conn.close();
} }
@Test @ParameterizedTest()
void testMultipleInsertsViaSqlStatementType() throws InitializationException, ProcessException, SQLException { @MethodSource("getTestCases")
public void testMultipleInsertsForOneStatementViaSqlStatementType(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
String[] sqlStatements = new String[] {
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
};
testMultipleStatementsViaSqlStatementType(sqlStatements);
}
@ParameterizedTest()
@MethodSource("getTestCases")
public void testMultipleInsertsForTwoStatementsViaSqlStatementType(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
String[] sqlStatements = new String[] {
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
"INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
};
testMultipleStatementsViaSqlStatementType(sqlStatements);
}
void testMultipleStatementsViaSqlStatementType(String[] sqlStatements) throws InitializationException, ProcessException, SQLException {
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -638,7 +772,7 @@ public class PutDatabaseRecordTest {
parser.addSchemaField("sql", RecordFieldType.STRING); parser.addSchemaField("sql", RecordFieldType.STRING);
parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101);INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)"); parser.addRecord(String.join(";", sqlStatements));
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE);
@ -655,14 +789,18 @@ public class PutDatabaseRecordTest {
final Connection conn = dbcp.getConnection(); final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement(); final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
if (sqlStatements.length >= 1) {
assertTrue(rs.next()); assertTrue(rs.next());
assertEquals(1, rs.getInt(1)); assertEquals(1, rs.getInt(1));
assertEquals("rec1", rs.getString(2)); assertEquals("rec1", rs.getString(2));
assertEquals(101, rs.getInt(3)); assertEquals(101, rs.getInt(3));
}
if (sqlStatements.length >= 2) {
assertTrue(rs.next()); assertTrue(rs.next());
assertEquals(2, rs.getInt(1)); assertEquals(2, rs.getInt(1));
assertEquals("rec2", rs.getString(2)); assertEquals("rec2", rs.getString(2));
assertEquals(102, rs.getInt(3)); assertEquals(102, rs.getInt(3));
}
assertFalse(rs.next()); assertFalse(rs.next());
stmt.close(); stmt.close();
@ -670,7 +808,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testMultipleInsertsViaSqlStatementTypeBadSQL() throws InitializationException, ProcessException, SQLException { public void testMultipleInsertsViaSqlStatementTypeBadSQL() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -706,7 +846,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInvalidData() throws InitializationException, ProcessException, SQLException { public void testInvalidData() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -733,15 +875,19 @@ public class PutDatabaseRecordTest {
final Connection conn = dbcp.getConnection(); final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement(); final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
try {
// Transaction should be rolled back and table should remain empty. // Transaction should be rolled back and table should remain empty.
assertFalse(rs.next()); assertFalse(rs.next());
} finally {
stmt.close(); stmt.close();
conn.close(); conn.close();
} }
}
@Test @Test
void testIOExceptionOnReadData() throws InitializationException, ProcessException, SQLException { public void testIOExceptionOnReadData() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -768,15 +914,58 @@ public class PutDatabaseRecordTest {
final Connection conn = dbcp.getConnection(); final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement(); final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
try {
// Transaction should be rolled back and table should remain empty. // Transaction should be rolled back and table should remain empty.
assertFalse(rs.next()); assertFalse(rs.next());
} finally {
stmt.close(); stmt.close();
conn.close(); conn.close();
} }
}
@Test @Test
void testSqlStatementTypeNoValue() throws InitializationException, ProcessException, SQLException { public void testIOExceptionOnReadDataAutoCommit() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.AUTO_COMMIT_0.getTestCase());
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT);
parser.addRecord(1, "rec1", 101);
parser.addRecord(2, "rec2", 102);
parser.addRecord(3, "rec3", 104);
parser.failAfter(1, MockRecordFailureType.IO_EXCEPTION);
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.enqueue(new byte[0]);
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE, 1);
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
try {
// Transaction should be rolled back and table should remain empty.
assertFalse(rs.next());
} finally {
stmt.close();
conn.close();
}
}
@Test
public void testSqlStatementTypeNoValue() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -801,7 +990,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testSqlStatementTypeNoValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException { public void testSqlStatementTypeNoValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.ROLLBACK_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -815,7 +1006,6 @@ public class PutDatabaseRecordTest {
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql"); runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql"); attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql");
@ -827,8 +1017,11 @@ public class PutDatabaseRecordTest {
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0); runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0);
} }
@Test @ParameterizedTest()
void testUpdate() throws InitializationException, ProcessException, SQLException { @MethodSource("getTestCases")
public void testUpdate(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -873,7 +1066,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testUpdatePkNotFirst() throws InitializationException, ProcessException, SQLException { public void testUpdatePkNotFirst() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable("CREATE TABLE PERSONS (name varchar(100), id integer primary key, code integer)"); recreateTable("CREATE TABLE PERSONS (name varchar(100), id integer primary key, code integer)");
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -917,8 +1112,11 @@ public class PutDatabaseRecordTest {
conn.close(); conn.close();
} }
@Test @ParameterizedTest()
void testUpdateMultipleSchemas() throws InitializationException, ProcessException, SQLException { @MethodSource("getTestCases")
public void testUpdateMultipleSchemas(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
// Manually create and drop the tables and schemas // Manually create and drop the tables and schemas
final Connection conn = dbcp.getConnection(); final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement(); final Statement stmt = conn.createStatement();
@ -986,8 +1184,11 @@ public class PutDatabaseRecordTest {
conn.close(); conn.close();
} }
@Test @ParameterizedTest()
void testUpdateAfterInsert() throws InitializationException, ProcessException, SQLException { @MethodSource("getTestCases")
public void testUpdateAfterInsert(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -1046,7 +1247,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException { public void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)"); recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)");
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -1067,7 +1270,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testUpdateSpecifyUpdateKeys() throws InitializationException, ProcessException, SQLException { public void testUpdateSpecifyUpdateKeys() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)"); recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)");
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -1113,7 +1318,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testUpdateSpecifyUpdateKeysNotFirst() throws InitializationException, ProcessException, SQLException { public void testUpdateSpecifyUpdateKeysNotFirst() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_1.getTestCase());
recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)"); recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)");
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -1159,7 +1366,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException, ProcessException, SQLException { public void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_1000.getTestCase());
recreateTable("CREATE TABLE PERSONS (\"id\" integer, \"name\" varchar(100), \"code\" integer)"); recreateTable("CREATE TABLE PERSONS (\"id\" integer, \"name\" varchar(100), \"code\" integer)");
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -1205,8 +1414,11 @@ public class PutDatabaseRecordTest {
conn.close(); conn.close();
} }
@Test @ParameterizedTest()
void testDelete() throws InitializationException, ProcessException, SQLException { @MethodSource("getTestCases")
public void testDelete(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
recreateTable(createPersons); recreateTable(createPersons);
Connection conn = dbcp.getConnection(); Connection conn = dbcp.getConnection();
Statement stmt = conn.createStatement(); Statement stmt = conn.createStatement();
@ -1250,7 +1462,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException { public void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_2.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
Connection conn = dbcp.getConnection(); Connection conn = dbcp.getConnection();
Statement stmt = conn.createStatement(); Statement stmt = conn.createStatement();
@ -1294,7 +1508,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testRecordPathOptions() throws InitializationException, SQLException { public void testRecordPathOptions() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)"); recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)");
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -1347,7 +1563,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException { public void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -1380,7 +1598,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException { public void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_1000.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -1397,6 +1617,7 @@ public class PutDatabaseRecordTest {
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, PutDatabaseRecord.MAX_BATCH_SIZE.getDefaultValue());
Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy(); Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy();
@ -1412,7 +1633,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testGenerateTableName() throws Exception { public void testGenerateTableName() throws InitializationException, ProcessException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
final List<RecordField> fields = Arrays.asList(new RecordField("id", RecordFieldType.INT.getDataType()), final List<RecordField> fields = Arrays.asList(new RecordField("id", RecordFieldType.INT.getDataType()),
new RecordField("name", RecordFieldType.STRING.getDataType()), new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("code", RecordFieldType.INT.getDataType()), new RecordField("code", RecordFieldType.INT.getDataType()),
@ -1446,7 +1669,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertMismatchedCompatibleDataTypes() throws InitializationException, ProcessException, SQLException { public void testInsertMismatchedCompatibleDataTypes() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -1514,7 +1739,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertMismatchedNotCompatibleDataTypes() throws InitializationException, ProcessException, SQLException { public void testInsertMismatchedNotCompatibleDataTypes() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
recreateTable(createPersons); recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser(); final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser); runner.addControllerService("parser", parser);
@ -1547,7 +1774,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testLongVarchar() throws InitializationException, ProcessException, SQLException { public void testLongVarchar() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
// Manually create and drop the tables and schemas // Manually create and drop the tables and schemas
final Connection conn = dbcp.getConnection(); final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement(); final Statement stmt = conn.createStatement();
@ -1590,7 +1819,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertWithDifferentColumnOrdering() throws InitializationException, ProcessException, SQLException { public void testInsertWithDifferentColumnOrdering() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
// Manually create and drop the tables and schemas // Manually create and drop the tables and schemas
final Connection conn = dbcp.getConnection(); final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement(); final Statement stmt = conn.createStatement();
@ -1637,7 +1868,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertWithBlobClob() throws Exception { public void testInsertWithBlobClob() throws InitializationException, ProcessException, SQLException, IOException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," + String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
"content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"; "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))";
@ -1688,7 +1921,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertHexStringIntoBinary() throws Exception { public void testInsertHexStringIntoBinary() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_HEXADECIMAL); runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_HEXADECIMAL);
String tableName = "HEX_STRING_TEST"; String tableName = "HEX_STRING_TEST";
@ -1729,7 +1964,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertBase64StringIntoBinary() throws Exception { public void testInsertBase64StringIntoBinary() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_BASE64); runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_BASE64);
String tableName = "BASE64_STRING_TEST"; String tableName = "BASE64_STRING_TEST";
@ -1770,7 +2007,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertWithBlobClobObjectArraySource() throws Exception { public void testInsertWithBlobClobObjectArraySource() throws InitializationException, ProcessException, SQLException, IOException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," + String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
"content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"; "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))";
@ -1821,7 +2060,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertWithBlobStringSource() throws Exception { public void testInsertWithBlobStringSource() throws InitializationException, ProcessException, SQLException, IOException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," + String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
"content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"; "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))";
@ -1866,7 +2107,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertWithBlobIntegerArraySource() throws Exception { public void testInsertWithBlobIntegerArraySource() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," + String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
"content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"; "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))";
@ -1895,7 +2138,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertEnum() throws InitializationException, ProcessException, SQLException, IOException { public void testInsertEnum() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2 dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, new HashMap<>()); runner.addControllerService(DBCP_SERVICE_ID, dbcp, new HashMap<>());
@ -1940,7 +2185,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertUUIDColumn() throws InitializationException, ProcessException, SQLException { public void testInsertUUIDColumn() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
// Manually create and drop the tables and schemas // Manually create and drop the tables and schemas
final Connection conn = dbcp.getConnection(); final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement(); final Statement stmt = conn.createStatement();
@ -1980,7 +2227,9 @@ public class PutDatabaseRecordTest {
} }
@Test @Test
void testInsertLongVarBinaryColumn() throws InitializationException, ProcessException, SQLException { public void testInsertLongVarBinaryColumn() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
// Manually create and drop the tables and schemas // Manually create and drop the tables and schemas
final Connection conn = dbcp.getConnection(); final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement(); final Statement stmt = conn.createStatement();
@ -2053,9 +2302,9 @@ public class PutDatabaseRecordTest {
} catch (SQLException ignore) { } catch (SQLException ignore) {
// Do nothing, may not have existed // Do nothing, may not have existed
} }
try (conn; stmt) {
stmt.execute(createSQL); stmt.execute(createSQL);
stmt.close(); }
conn.close();
} }
private Map<String, Object> createValues(final int id, final String name, final int code) { private Map<String, Object> createValues(final int id, final String name, final int code) {
@ -2109,4 +2358,33 @@ public class PutDatabaseRecordTest {
} }
} }
} }
public static class TestCase {
TestCase(Boolean autoCommit, Boolean rollbackOnFailure, Integer batchSize) {
this.autoCommit = autoCommit;
this.rollbackOnFailure = rollbackOnFailure;
this.batchSize = batchSize;
}
private Boolean autoCommit = null;
private Boolean rollbackOnFailure = null;
private Integer batchSize = null;
String getAutoCommitAsString() {
return autoCommit == null ? null : autoCommit.toString();
}
String getRollbackOnFailureAsString() {
return rollbackOnFailure == null ? null : rollbackOnFailure.toString();
}
String getBatchSizeAsString() {
return batchSize == null ? null : batchSize.toString();
}
public String toString() {
return "autoCommit=" + String.valueOf(autoCommit) +
"; rollbackOnFailure=" + String.valueOf(rollbackOnFailure) +
"; batchSize=" + String.valueOf(batchSize);
}
}
} }