diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 26b4281cd0..79b98d20e2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -27,6 +27,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor.Builder; @@ -376,6 +377,17 @@ public class PutDatabaseRecord extends AbstractProcessor { .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .build(); + static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder() + .name("database-session-autocommit") + .displayName("Database Session AutoCommit") + .description("The autocommit mode to set on the database connection being used. If set to false, the operation(s) will be explicitly committed or rolled back " + + "(based on success or failure respectively). If set to true, the driver/database automatically handles the commit/rollback.") + .allowableValues("true", "false") + .defaultValue("false") + .required(false) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + static final PropertyDescriptor DB_TYPE; protected static final Map dbAdapters; @@ -431,6 +443,7 @@ public class PutDatabaseRecord extends AbstractProcessor { pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE); pds.add(TABLE_SCHEMA_CACHE_SIZE); pds.add(MAX_BATCH_SIZE); + pds.add(AUTO_COMMIT); propDescriptors = Collections.unmodifiableList(pds); } @@ -466,9 +479,41 @@ public class PutDatabaseRecord extends AbstractProcessor { ); } + final boolean autoCommit = validationContext.getProperty(AUTO_COMMIT).asBoolean(); + final boolean rollbackOnFailure = validationContext.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean(); + if (autoCommit && rollbackOnFailure) { + validationResults.add(new ValidationResult.Builder() + .subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName()) + .explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'. " + + "Transaction rollbacks for batch updates cannot be supported when auto commit is set to 'true'", + RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), AUTO_COMMIT.getDisplayName())) + .build()); + } + + if (autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) { + final String explanation = format("'%s' must be hard-coded to zero when '%s' is set to 'true'." + + " Batch size equal to zero executes all statements in a single transaction" + + " which allows automatic rollback to revert all statements if an error occurs", + MAX_BATCH_SIZE.getDisplayName(), AUTO_COMMIT.getDisplayName()); + + validationResults.add(new ValidationResult.Builder() + .subject(MAX_BATCH_SIZE.getDisplayName()) + .explanation(explanation) + .build()); + } + return validationResults; } + private boolean isMaxBatchSizeHardcodedToZero(ValidationContext validationContext) { + try { + return !validationContext.getProperty(MAX_BATCH_SIZE).isExpressionLanguagePresent() + && 0 == validationContext.getProperty(MAX_BATCH_SIZE).asInteger(); + } catch (Exception ex) { + return false; + } + } + @OnScheduled public void onScheduled(final ProcessContext context) { databaseAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); @@ -490,6 +535,11 @@ public class PutDatabaseRecord extends AbstractProcessor { dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue); } + @OnUnscheduled + public final void onUnscheduled() { + supportsBatchUpdates = Optional.empty(); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -498,84 +548,98 @@ public class PutDatabaseRecord extends AbstractProcessor { } final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - Optional connectionHolder = Optional.empty(); + Connection connection = null; boolean originalAutoCommit = false; try { - final Connection connection = dbcpService.getConnection(flowFile.getAttributes()); - connectionHolder = Optional.of(connection); + connection = dbcpService.getConnection(flowFile.getAttributes()); originalAutoCommit = connection.getAutoCommit(); - if (originalAutoCommit) { + final boolean autoCommit = context.getProperty(AUTO_COMMIT).asBoolean(); + if (originalAutoCommit != autoCommit) { try { - connection.setAutoCommit(false); + connection.setAutoCommit(autoCommit); } catch (SQLFeatureNotSupportedException sfnse) { - getLogger().debug("setAutoCommit(false) not supported by this driver"); + getLogger().debug(String.format("setAutoCommit(%s) not supported by this driver", autoCommit), sfnse); } } putToDatabase(context, session, flowFile, connection); + // Only commit the connection if auto-commit is false - if (!originalAutoCommit) { + if (!connection.getAutoCommit()) { connection.commit(); } session.transfer(flowFile, REL_SUCCESS); session.getProvenanceReporter().send(flowFile, getJdbcUrl(connection)); } catch (final Exception e) { - // When an Exception is thrown, we want to route to 'retry' if we expect that attempting the same request again - // might work. Otherwise, route to failure. SQLTransientException is a specific type that indicates that a retry may work. - final Relationship relationship; - final Throwable toAnalyze = (e instanceof BatchUpdateException) ? e.getCause() : e; - if (toAnalyze instanceof SQLTransientException) { - relationship = REL_RETRY; - flowFile = session.penalize(flowFile); - } else { - relationship = REL_FAILURE; - } - - getLogger().error("Failed to put Records to database for {}. Routing to {}.", flowFile, relationship, e); - - final boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean(); - if (rollbackOnFailure) { - session.rollback(); - } else { - flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown": e.getMessage())); - session.transfer(flowFile, relationship); - } - - connectionHolder.ifPresent(connection -> { - try { - if (!connection.getAutoCommit()) { - connection.rollback(); - } - } catch (final Exception rollbackException) { - getLogger().error("Failed to rollback JDBC transaction", rollbackException); - } - }); + routeOnException(context, session, connection, e, flowFile); } finally { - if (originalAutoCommit) { - connectionHolder.ifPresent(connection -> { - try { - connection.setAutoCommit(true); - } catch (final Exception autoCommitException) { - getLogger().warn("Failed to set auto-commit back to true on connection", autoCommitException); - } - }); - } - - connectionHolder.ifPresent(connection -> { - try { - connection.close(); - } catch (final Exception closeException) { - getLogger().warn("Failed to close database connection", closeException); - } - }); + closeConnection(connection, originalAutoCommit); } } + private void routeOnException(final ProcessContext context, final ProcessSession session, + Connection connection, Exception e, FlowFile flowFile) { + // When an Exception is thrown, we want to route to 'retry' if we expect that attempting the same request again + // might work. Otherwise, route to failure. SQLTransientException is a specific type that indicates that a retry may work. + final Relationship relationship; + final Throwable toAnalyze = (e instanceof BatchUpdateException) ? e.getCause() : e; + if (toAnalyze instanceof SQLTransientException) { + relationship = REL_RETRY; + flowFile = session.penalize(flowFile); + } else { + relationship = REL_FAILURE; + } - private void executeSQL(final ProcessContext context, final FlowFile flowFile, final Connection connection, final RecordReader recordReader) + getLogger().error("Failed to put Records to database for {}. Routing to {}.", flowFile, relationship, e); + + final boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean(); + if (rollbackOnFailure) { + session.rollback(); + } else { + flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown": e.getMessage())); + session.transfer(flowFile, relationship); + } + + rollbackConnection(connection); + } + + private void rollbackConnection(Connection connection) { + if (connection != null) { + try { + if (!connection.getAutoCommit()) { + connection.rollback(); + } + } catch (final Exception rollbackException) { + getLogger().error("Failed to rollback JDBC transaction", rollbackException); + } + } + } + + private void closeConnection(Connection connection, boolean originalAutoCommit) { + if (connection != null) { + try { + if (originalAutoCommit != connection.getAutoCommit()) { + connection.setAutoCommit(originalAutoCommit); + } + } catch (final Exception autoCommitException) { + getLogger().warn(String.format("Failed to set auto-commit back to %s on connection", originalAutoCommit), autoCommitException); + } + + try { + if (!connection.isClosed()) { + connection.close(); + } + } catch (final Exception closeException) { + getLogger().warn("Failed to close database connection", closeException); + } + } + } + + private void executeSQL(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, + final Connection connection, final RecordReader recordReader) throws IllegalArgumentException, MalformedRecordException, IOException, SQLException { final RecordSchema recordSchema = recordReader.getSchema(); @@ -602,23 +666,66 @@ public class PutDatabaseRecord extends AbstractProcessor { } } - Record currentRecord; - while ((currentRecord = recordReader.nextRecord()) != null) { + final ComponentLog log = getLogger(); + final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); + // Do not use batch if set to batch size of 1 because that is similar to not using batching. + // Also do not use batches if the connection does not support batching. + boolean useBatch = maxBatchSize != 1 && isSupportBatchUpdates(connection); + int currentBatchSize = 0; + int batchIndex = 0; + + boolean isFirstRecord = true; + Record nextRecord = recordReader.nextRecord(); + while (nextRecord != null) { + Record currentRecord = nextRecord; final String sql = currentRecord.getAsString(sqlField); + nextRecord = recordReader.nextRecord(); + if (sql == null || StringUtils.isEmpty(sql)) { throw new MalformedRecordException(format("Record had no (or null) value for Field Containing SQL: %s, FlowFile %s", sqlField, flowFile)); } - // Execute the statement(s) as-is + final String[] sqlStatements; if (context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean()) { final String regex = "(? 0 && currentBatchSize >= maxBatchSize) { + batchIndex++; + log.debug("Executing batch with last query {} because batch reached max size %s for {}; batch index: {}; batch size: {}", + sql, maxBatchSize, flowFile, batchIndex, currentBatchSize); + statement.executeBatch(); + session.adjustCounter("Batches Executed", 1, false); + currentBatchSize = 0; + } + } + + if (useBatch && currentBatchSize > 0) { + batchIndex++; + log.debug("Executing last batch because last statement reached for {}; batch index: {}; batch size: {}", + flowFile, batchIndex, currentBatchSize); + statement.executeBatch(); + session.adjustCounter("Batches Executed", 1, false); } } } @@ -730,7 +837,7 @@ public class PutDatabaseRecord extends AbstractProcessor { final List fieldIndexes = preparedSqlAndColumns.getSqlAndIncludedColumns().getFieldIndexes(); final String sql = preparedSqlAndColumns.getSqlAndIncludedColumns().getSql(); - if (currentBatchSize > 0 && ps != lastPreparedStatement && lastPreparedStatement != null) { + if (ps != lastPreparedStatement && lastPreparedStatement != null) { batchIndex++; log.debug("Executing query {} because Statement Type changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: {}", sql, flowFile, fieldIndexes, batchIndex, currentBatchSize); @@ -1032,7 +1139,7 @@ public class PutDatabaseRecord extends AbstractProcessor { final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger()); if (SQL_TYPE.equalsIgnoreCase(statementType)) { - executeSQL(context, flowFile, connection, recordReader); + executeSQL(context, session, flowFile, connection, recordReader); } else { final DMLSettings settings = new DMLSettings(context); executeDML(context, session, flowFile, connection, recordReader, statementType, settings); @@ -1493,6 +1600,28 @@ public class PutDatabaseRecord extends AbstractProcessor { return normalizedKeyColumnNames; } + private Optional supportsBatchUpdates = Optional.empty(); + + private void initializeSupportBatchUpdates(Connection connection) { + if (!supportsBatchUpdates.isPresent()) { + try { + final DatabaseMetaData dmd = connection.getMetaData(); + supportsBatchUpdates = Optional.of(dmd.supportsBatchUpdates()); + getLogger().debug(String.format("Connection supportsBatchUpdates is %s", + supportsBatchUpdates.orElse(Boolean.FALSE))); + } catch (Exception ex) { + supportsBatchUpdates = Optional.of(Boolean.FALSE); + getLogger().debug(String.format("Exception while testing if connection supportsBatchUpdates due to %s - %s", + ex.getClass().getName(), ex.getMessage())); + } + } + } + + private boolean isSupportBatchUpdates(Connection connection) { + initializeSupportBatchUpdates(connection); + return supportsBatchUpdates.orElse(Boolean.FALSE); + } + static class SchemaKey { private final String catalog; private final String schemaName; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index 70a908b377..d9ef31733f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -37,9 +37,11 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.file.FileUtils; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentMatchers; import org.mockito.stubbing.Answer; @@ -68,6 +70,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.function.Supplier; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -86,7 +89,51 @@ import static org.mockito.Mockito.when; public class PutDatabaseRecordTest { - private static String DBCP_SERVICE_ID = "dbcp"; + private enum TestCaseEnum { + // ENABLED means to use that test case in the parameterized tests. + // DISABLED test cases are used for single-run tests which are not parameterized + DEFAULT_0(ENABLED, new TestCase(false, false, 0)), + DEFAULT_1(DISABLED, new TestCase(false, false, 1)), + DEFAULT_2(DISABLED, new TestCase(false, false, 2)), + DEFAULT_1000(DISABLED, new TestCase(false, false, 1000)), + + ROLLBACK_0(DISABLED, new TestCase(false, true, 0)), + ROLLBACK_1(DISABLED, new TestCase(false, true, 1)), + ROLLBACK_2(DISABLED, new TestCase(false, true, 2)), + ROLLBACK_1000(ENABLED, new TestCase(false, true, 1000)), + + // If autoCommit equals true, then rollbackOnFailure must be false AND batchSize must equal 0 + AUTO_COMMIT_0(ENABLED, new TestCase(true, false, 0)); + + private final boolean enabled; + private final TestCase testCase; + + TestCaseEnum(boolean enabled, TestCase t) { + this.enabled = enabled; + this.testCase = t; + } + + public boolean isEnabled() { + return enabled; + } + + public TestCase getTestCase() { + return testCase; + } + + } + + static Stream getTestCases() { + return Arrays.stream(TestCaseEnum.values()) + .filter(TestCaseEnum::isEnabled) + .map(TestCaseEnum::getTestCase) + .map(Arguments::of); + } + + private final static boolean ENABLED = true; + private final static boolean DISABLED = false; + + private final static String DBCP_SERVICE_ID = "dbcp"; private static final String CONNECTION_FAILED = "Connection Failed"; @@ -141,8 +188,7 @@ public class PutDatabaseRecordTest { System.clearProperty("derby.stream.error.file"); } - @BeforeEach - public void setRunner() throws Exception { + private void setRunner(TestCase testCase) throws InitializationException { processor = new PutDatabaseRecord(); //Mock the DBCP Controller Service so we can control the Results dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION)); @@ -153,10 +199,15 @@ public class PutDatabaseRecordTest { runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties); runner.enableControllerService(dbcp); runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID); + runner.setProperty(PutDatabaseRecord.AUTO_COMMIT, testCase.getAutoCommitAsString()); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, testCase.getRollbackOnFailureAsString()); + runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, testCase.getBatchSizeAsString()); } @Test public void testGetConnectionFailure() throws InitializationException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + final MockRecordParser parser = new MockRecordParser(); runner.addControllerService(PARSER_ID, parser); runner.enableControllerService(parser); @@ -175,6 +226,8 @@ public class PutDatabaseRecordTest { @Test public void testSetAutoCommitFalseFailure() throws InitializationException, SQLException { + setRunner(TestCaseEnum.DEFAULT_1.getTestCase()); + dbcp = new DBCPServiceAutoCommitTest(DB_LOCATION); final Map dbcpProperties = new HashMap<>(); runner = TestRunners.newTestRunner(processor); @@ -211,6 +264,8 @@ public class PutDatabaseRecordTest { @Test public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException { + setRunner(TestCaseEnum.DEFAULT_2.getTestCase()); + // Need to override the @Before method with a new processor that behaves badly processor = new PutDatabaseRecordUnmatchedField(); //Mock the DBCP Controller Service so we can control the Results @@ -257,7 +312,8 @@ public class PutDatabaseRecordTest { } @Test - void testGeneratePreparedStatements() throws SQLException, MalformedRecordException { + public void testGeneratePreparedStatements() throws InitializationException, SQLException, MalformedRecordException { + setRunner(TestCaseEnum.DEFAULT_1000.getTestCase()); final List fields = Arrays.asList(new RecordField("id", RecordFieldType.INT.getDataType()), new RecordField("name", RecordFieldType.STRING.getDataType()), @@ -295,7 +351,8 @@ public class PutDatabaseRecordTest { } @Test - void testGeneratePreparedStatementsFailUnmatchedField() { + public void testGeneratePreparedStatementsFailUnmatchedField() throws InitializationException { + setRunner(TestCaseEnum.ROLLBACK_0.getTestCase()); final List fields = Arrays.asList(new RecordField("id", RecordFieldType.INT.getDataType()), new RecordField("name", RecordFieldType.STRING.getDataType()), @@ -340,8 +397,11 @@ public class PutDatabaseRecordTest { assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage()); } - @Test - void testInsert() throws InitializationException, ProcessException, SQLException, IOException { + @ParameterizedTest() + @MethodSource("getTestCases") + public void testInsert(TestCase testCase) throws InitializationException, ProcessException, SQLException { + setRunner(testCase); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -406,7 +466,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertNonRequiredColumns() throws InitializationException, ProcessException, SQLException { + public void testInsertNonRequiredColumns() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.ROLLBACK_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -471,7 +533,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException { + public void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -483,7 +547,8 @@ public class PutDatabaseRecordTest { parser.addRecord(1, "rec1", 101); parser.addRecord(2, "rec2", 102); - parser.addRecord(3, "rec3", 1000); // This record violates the constraint on the "code" column so should result in FlowFile being routed to failure + // This record violates the constraint on the "code" column so should result in FlowFile routing to failure + parser.addRecord(3, "rec3", 1000); parser.addRecord(4, "rec4", 104); runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); @@ -505,7 +570,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertBatchUpdateExceptionRollbackOnFailure() throws InitializationException, ProcessException, SQLException { + public void testInsertBatchUpdateExceptionRollbackOnFailure() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.ROLLBACK_1000.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -523,7 +590,6 @@ public class PutDatabaseRecordTest { runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); - runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); runner.enqueue(new byte[0]); runner.run(); @@ -539,7 +605,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertNoTableSpecified() throws InitializationException, ProcessException, SQLException { + public void testInsertNoTableSpecified() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -563,7 +631,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertNoTableExists() throws InitializationException, ProcessException, SQLException { + public void testInsertNoTableExists() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.AUTO_COMMIT_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -587,10 +657,46 @@ public class PutDatabaseRecordTest { MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0); final String errorMessage = flowFile.getAttribute("putdatabaserecord.error"); assertTrue(errorMessage.contains("PERSONS2")); + runner.enqueue(); } - @Test - void testInsertViaSqlStatementType() throws InitializationException, ProcessException, SQLException { + @ParameterizedTest() + @MethodSource("getTestCases") + public void testInsertViaSqlTypeOneStatement(TestCase testCase) throws InitializationException, ProcessException, SQLException { + setRunner(testCase); + + String[] sqlStatements = new String[] { + "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)" + }; + testInsertViaSqlTypeStatements(sqlStatements, false); + } + + @ParameterizedTest() + @MethodSource("getTestCases") + public void testInsertViaSqlTypeTwoStatementsSemicolon(TestCase testCase) throws InitializationException, ProcessException, SQLException { + setRunner(testCase); + + String[] sqlStatements = new String[] { + "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)", + "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);" + }; + testInsertViaSqlTypeStatements(sqlStatements, true); + } + + @ParameterizedTest() + @MethodSource("getTestCases") + public void testInsertViaSqlTypeThreeStatements(TestCase testCase) throws InitializationException, ProcessException, SQLException { + setRunner(testCase); + + String[] sqlStatements = new String[] { + "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)", + "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)", + "UPDATE PERSONS SET code = 101 WHERE id = 1" + }; + testInsertViaSqlTypeStatements(sqlStatements, false); + } + + void testInsertViaSqlTypeStatements(String[] sqlStatements, boolean allowMultipleStatements) throws InitializationException, ProcessException, SQLException { recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -598,13 +704,15 @@ public class PutDatabaseRecordTest { parser.addSchemaField("sql", RecordFieldType.STRING); - parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"); - parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)"); + for (String sqlStatement : sqlStatements) { + parser.addRecord(sqlStatement); + } runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE); runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql"); + runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, String.valueOf(allowMultipleStatements)); final Map attrs = new HashMap<>(); attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql"); @@ -615,22 +723,48 @@ public class PutDatabaseRecordTest { final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("rec1", rs.getString(2)); - assertEquals(101, rs.getInt(3)); - assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); - assertEquals("rec2", rs.getString(2)); - assertEquals(102, rs.getInt(3)); + if (sqlStatements.length >= 1) { + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("rec1", rs.getString(2)); + assertEquals(101, rs.getInt(3)); + } + if (sqlStatements.length >= 2) { + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("rec2", rs.getString(2)); + assertEquals(102, rs.getInt(3)); + } assertFalse(rs.next()); stmt.close(); conn.close(); } - @Test - void testMultipleInsertsViaSqlStatementType() throws InitializationException, ProcessException, SQLException { + @ParameterizedTest() + @MethodSource("getTestCases") + public void testMultipleInsertsForOneStatementViaSqlStatementType(TestCase testCase) throws InitializationException, ProcessException, SQLException { + setRunner(testCase); + + String[] sqlStatements = new String[] { + "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)" + }; + testMultipleStatementsViaSqlStatementType(sqlStatements); + } + + @ParameterizedTest() + @MethodSource("getTestCases") + public void testMultipleInsertsForTwoStatementsViaSqlStatementType(TestCase testCase) throws InitializationException, ProcessException, SQLException { + setRunner(testCase); + + String[] sqlStatements = new String[] { + "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)", + "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);" + }; + testMultipleStatementsViaSqlStatementType(sqlStatements); + } + + void testMultipleStatementsViaSqlStatementType(String[] sqlStatements) throws InitializationException, ProcessException, SQLException { recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -638,7 +772,7 @@ public class PutDatabaseRecordTest { parser.addSchemaField("sql", RecordFieldType.STRING); - parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101);INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)"); + parser.addRecord(String.join(";", sqlStatements)); runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE); @@ -655,14 +789,18 @@ public class PutDatabaseRecordTest { final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("rec1", rs.getString(2)); - assertEquals(101, rs.getInt(3)); - assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); - assertEquals("rec2", rs.getString(2)); - assertEquals(102, rs.getInt(3)); + if (sqlStatements.length >= 1) { + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("rec1", rs.getString(2)); + assertEquals(101, rs.getInt(3)); + } + if (sqlStatements.length >= 2) { + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("rec2", rs.getString(2)); + assertEquals(102, rs.getInt(3)); + } assertFalse(rs.next()); stmt.close(); @@ -670,7 +808,9 @@ public class PutDatabaseRecordTest { } @Test - void testMultipleInsertsViaSqlStatementTypeBadSQL() throws InitializationException, ProcessException, SQLException { + public void testMultipleInsertsViaSqlStatementTypeBadSQL() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -679,8 +819,8 @@ public class PutDatabaseRecordTest { parser.addSchemaField("sql", RecordFieldType.STRING); parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101);" + - "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);" + - "INSERT INTO PERSONS2 (id, name, code) VALUES (2, 'rec2',102);"); + "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);" + + "INSERT INTO PERSONS2 (id, name, code) VALUES (2, 'rec2',102);"); runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE); @@ -706,7 +846,9 @@ public class PutDatabaseRecordTest { } @Test - void testInvalidData() throws InitializationException, ProcessException, SQLException { + public void testInvalidData() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -733,15 +875,19 @@ public class PutDatabaseRecordTest { final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - // Transaction should be rolled back and table should remain empty. - assertFalse(rs.next()); - - stmt.close(); - conn.close(); + try { + // Transaction should be rolled back and table should remain empty. + assertFalse(rs.next()); + } finally { + stmt.close(); + conn.close(); + } } @Test - void testIOExceptionOnReadData() throws InitializationException, ProcessException, SQLException { + public void testIOExceptionOnReadData() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -768,15 +914,58 @@ public class PutDatabaseRecordTest { final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - // Transaction should be rolled back and table should remain empty. - assertFalse(rs.next()); - - stmt.close(); - conn.close(); + try { + // Transaction should be rolled back and table should remain empty. + assertFalse(rs.next()); + } finally { + stmt.close(); + conn.close(); + } } @Test - void testSqlStatementTypeNoValue() throws InitializationException, ProcessException, SQLException { + public void testIOExceptionOnReadDataAutoCommit() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.AUTO_COMMIT_0.getTestCase()); + + recreateTable(createPersons); + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("code", RecordFieldType.INT); + + parser.addRecord(1, "rec1", 101); + parser.addRecord(2, "rec2", 102); + parser.addRecord(3, "rec3", 104); + + parser.failAfter(1, MockRecordFailureType.IO_EXCEPTION); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE, 1); + final Connection conn = dbcp.getConnection(); + final Statement stmt = conn.createStatement(); + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + try { + // Transaction should be rolled back and table should remain empty. + assertFalse(rs.next()); + } finally { + stmt.close(); + conn.close(); + } + } + + @Test + public void testSqlStatementTypeNoValue() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -801,7 +990,9 @@ public class PutDatabaseRecordTest { } @Test - void testSqlStatementTypeNoValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException { + public void testSqlStatementTypeNoValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.ROLLBACK_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -815,7 +1006,6 @@ public class PutDatabaseRecordTest { runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE); runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql"); - runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); final Map attrs = new HashMap<>(); attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql"); @@ -827,8 +1017,11 @@ public class PutDatabaseRecordTest { runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0); } - @Test - void testUpdate() throws InitializationException, ProcessException, SQLException { + @ParameterizedTest() + @MethodSource("getTestCases") + public void testUpdate(TestCase testCase) throws InitializationException, ProcessException, SQLException { + setRunner(testCase); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -873,7 +1066,9 @@ public class PutDatabaseRecordTest { } @Test - void testUpdatePkNotFirst() throws InitializationException, ProcessException, SQLException { + public void testUpdatePkNotFirst() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable("CREATE TABLE PERSONS (name varchar(100), id integer primary key, code integer)"); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -917,8 +1112,11 @@ public class PutDatabaseRecordTest { conn.close(); } - @Test - void testUpdateMultipleSchemas() throws InitializationException, ProcessException, SQLException { + @ParameterizedTest() + @MethodSource("getTestCases") + public void testUpdateMultipleSchemas(TestCase testCase) throws InitializationException, ProcessException, SQLException { + setRunner(testCase); + // Manually create and drop the tables and schemas final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); @@ -986,8 +1184,11 @@ public class PutDatabaseRecordTest { conn.close(); } - @Test - void testUpdateAfterInsert() throws InitializationException, ProcessException, SQLException { + @ParameterizedTest() + @MethodSource("getTestCases") + public void testUpdateAfterInsert(TestCase testCase) throws InitializationException, ProcessException, SQLException { + setRunner(testCase); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -1046,7 +1247,9 @@ public class PutDatabaseRecordTest { } @Test - void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException { + public void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)"); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -1067,7 +1270,9 @@ public class PutDatabaseRecordTest { } @Test - void testUpdateSpecifyUpdateKeys() throws InitializationException, ProcessException, SQLException { + public void testUpdateSpecifyUpdateKeys() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)"); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -1113,7 +1318,9 @@ public class PutDatabaseRecordTest { } @Test - void testUpdateSpecifyUpdateKeysNotFirst() throws InitializationException, ProcessException, SQLException { + public void testUpdateSpecifyUpdateKeysNotFirst() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_1.getTestCase()); + recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)"); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -1159,7 +1366,9 @@ public class PutDatabaseRecordTest { } @Test - void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException, ProcessException, SQLException { + public void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_1000.getTestCase()); + recreateTable("CREATE TABLE PERSONS (\"id\" integer, \"name\" varchar(100), \"code\" integer)"); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -1205,8 +1414,11 @@ public class PutDatabaseRecordTest { conn.close(); } - @Test - void testDelete() throws InitializationException, ProcessException, SQLException { + @ParameterizedTest() + @MethodSource("getTestCases") + public void testDelete(TestCase testCase) throws InitializationException, ProcessException, SQLException { + setRunner(testCase); + recreateTable(createPersons); Connection conn = dbcp.getConnection(); Statement stmt = conn.createStatement(); @@ -1250,7 +1462,9 @@ public class PutDatabaseRecordTest { } @Test - void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException { + public void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_2.getTestCase()); + recreateTable(createPersons); Connection conn = dbcp.getConnection(); Statement stmt = conn.createStatement(); @@ -1294,7 +1508,9 @@ public class PutDatabaseRecordTest { } @Test - void testRecordPathOptions() throws InitializationException, SQLException { + public void testRecordPathOptions() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), code integer)"); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -1347,7 +1563,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException { + public void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -1380,7 +1598,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException { + public void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_1000.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -1397,6 +1617,7 @@ public class PutDatabaseRecordTest { runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); + runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, PutDatabaseRecord.MAX_BATCH_SIZE.getDefaultValue()); Supplier spyStmt = createPreparedStatementSpy(); @@ -1412,7 +1633,9 @@ public class PutDatabaseRecordTest { } @Test - void testGenerateTableName() throws Exception { + public void testGenerateTableName() throws InitializationException, ProcessException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + final List fields = Arrays.asList(new RecordField("id", RecordFieldType.INT.getDataType()), new RecordField("name", RecordFieldType.STRING.getDataType()), new RecordField("code", RecordFieldType.INT.getDataType()), @@ -1446,7 +1669,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertMismatchedCompatibleDataTypes() throws InitializationException, ProcessException, SQLException { + public void testInsertMismatchedCompatibleDataTypes() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -1514,7 +1739,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertMismatchedNotCompatibleDataTypes() throws InitializationException, ProcessException, SQLException { + public void testInsertMismatchedNotCompatibleDataTypes() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); runner.addControllerService("parser", parser); @@ -1547,7 +1774,9 @@ public class PutDatabaseRecordTest { } @Test - void testLongVarchar() throws InitializationException, ProcessException, SQLException { + public void testLongVarchar() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + // Manually create and drop the tables and schemas final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); @@ -1590,7 +1819,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertWithDifferentColumnOrdering() throws InitializationException, ProcessException, SQLException { + public void testInsertWithDifferentColumnOrdering() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + // Manually create and drop the tables and schemas final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); @@ -1637,7 +1868,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertWithBlobClob() throws Exception { + public void testInsertWithBlobClob() throws InitializationException, ProcessException, SQLException, IOException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," + "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"; @@ -1688,7 +1921,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertHexStringIntoBinary() throws Exception { + public void testInsertHexStringIntoBinary() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_HEXADECIMAL); String tableName = "HEX_STRING_TEST"; @@ -1729,7 +1964,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertBase64StringIntoBinary() throws Exception { + public void testInsertBase64StringIntoBinary() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_BASE64); String tableName = "BASE64_STRING_TEST"; @@ -1770,7 +2007,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertWithBlobClobObjectArraySource() throws Exception { + public void testInsertWithBlobClobObjectArraySource() throws InitializationException, ProcessException, SQLException, IOException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," + "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"; @@ -1821,7 +2060,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertWithBlobStringSource() throws Exception { + public void testInsertWithBlobStringSource() throws InitializationException, ProcessException, SQLException, IOException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," + "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"; @@ -1866,7 +2107,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertWithBlobIntegerArraySource() throws Exception { + public void testInsertWithBlobIntegerArraySource() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," + "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"; @@ -1895,7 +2138,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertEnum() throws InitializationException, ProcessException, SQLException, IOException { + public void testInsertEnum() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2 runner = TestRunners.newTestRunner(processor); runner.addControllerService(DBCP_SERVICE_ID, dbcp, new HashMap<>()); @@ -1940,7 +2185,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertUUIDColumn() throws InitializationException, ProcessException, SQLException { + public void testInsertUUIDColumn() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + // Manually create and drop the tables and schemas final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); @@ -1980,7 +2227,9 @@ public class PutDatabaseRecordTest { } @Test - void testInsertLongVarBinaryColumn() throws InitializationException, ProcessException, SQLException { + public void testInsertLongVarBinaryColumn() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + // Manually create and drop the tables and schemas final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); @@ -2023,7 +2272,7 @@ public class PutDatabaseRecordTest { private void recreateTable() throws ProcessException { try (final Connection conn = dbcp.getConnection(); - final Statement stmt = conn.createStatement()) { + final Statement stmt = conn.createStatement()) { stmt.execute("drop table PERSONS"); stmt.execute(createPersons); } catch (SQLException ignore) { @@ -2053,9 +2302,9 @@ public class PutDatabaseRecordTest { } catch (SQLException ignore) { // Do nothing, may not have existed } - stmt.execute(createSQL); - stmt.close(); - conn.close(); + try (conn; stmt) { + stmt.execute(createSQL); + } } private Map createValues(final int id, final String name, final int code) { @@ -2109,4 +2358,33 @@ public class PutDatabaseRecordTest { } } } + + public static class TestCase { + TestCase(Boolean autoCommit, Boolean rollbackOnFailure, Integer batchSize) { + this.autoCommit = autoCommit; + this.rollbackOnFailure = rollbackOnFailure; + this.batchSize = batchSize; + } + private Boolean autoCommit = null; + private Boolean rollbackOnFailure = null; + private Integer batchSize = null; + + String getAutoCommitAsString() { + return autoCommit == null ? null : autoCommit.toString(); + } + + String getRollbackOnFailureAsString() { + return rollbackOnFailure == null ? null : rollbackOnFailure.toString(); + } + + String getBatchSizeAsString() { + return batchSize == null ? null : batchSize.toString(); + } + + public String toString() { + return "autoCommit=" + String.valueOf(autoCommit) + + "; rollbackOnFailure=" + String.valueOf(rollbackOnFailure) + + "; batchSize=" + String.valueOf(batchSize); + } + } } \ No newline at end of file