From b77dbd503099addb760e2d1f4b1249a90839cd97 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Fri, 29 Jan 2021 15:01:13 -0500 Subject: [PATCH] NIFI-8172: Provide schema name to getPrimaryKeys call in PutDatabaseRecord Signed-off-by: Pierre Villard This closes #4782. --- .../standard/PutDatabaseRecord.java | 53 ++++-- .../standard/TestPutDatabaseRecord.groovy | 163 ++++++++++++++---- 2 files changed, 171 insertions(+), 45 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index f3dd970b30..5ef883a66c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -207,7 +207,8 @@ public class PutDatabaseRecord extends AbstractProcessor { static final PropertyDescriptor CATALOG_NAME = new Builder() .name("put-db-record-catalog-name") .displayName("Catalog Name") - .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty") + .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the " + + "property is set and the database is case-sensitive, the catalog name must match the database's catalog name exactly.") .required(false) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -216,7 +217,8 @@ public class PutDatabaseRecord extends AbstractProcessor { static final PropertyDescriptor SCHEMA_NAME = new Builder() .name("put-db-record-schema-name") .displayName("Schema Name") - .description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty") + .description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the " + + "property is set and the database is case-sensitive, the schema name must match the database's schema name exactly.") .required(false) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -225,7 +227,7 @@ public class PutDatabaseRecord extends AbstractProcessor { static final PropertyDescriptor TABLE_NAME = new Builder() .name("put-db-record-table-name") .displayName("Table Name") - .description("The name of the table that the statement should affect.") + .description("The name of the table that the statement should affect. Note that if the database is case-sensitive, the table name must match the database's table name exactly.") .required(true) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -887,7 +889,8 @@ public class PutDatabaseRecord extends AbstractProcessor { final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames)); if (desc == null && !settings.ignoreUnmappedFields) { - throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database"); + throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); } if (desc != null) { @@ -903,6 +906,10 @@ public class PutDatabaseRecord extends AbstractProcessor { sqlBuilder.append(desc.getColumnName()); } includedColumns.add(i); + } else { + // User is ignoring unmapped fields, but log at debug level just in case + getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); } } @@ -912,7 +919,8 @@ public class PutDatabaseRecord extends AbstractProcessor { sqlBuilder.append(")"); if (fieldsFound.get() == 0) { - throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table"); + throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); } } return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns); @@ -940,7 +948,8 @@ public class PutDatabaseRecord extends AbstractProcessor { final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames)); if (desc == null && !settings.ignoreUnmappedFields) { - throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database"); + throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); } if (desc != null) { @@ -950,6 +959,10 @@ public class PutDatabaseRecord extends AbstractProcessor { usedColumnNames.add(desc.getColumnName()); } usedColumnIndices.add(i); + } else { + // User is ignoring unmapped fields, but log at debug level just in case + getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); } } } @@ -981,7 +994,8 @@ public class PutDatabaseRecord extends AbstractProcessor { final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames)); if (desc == null && !settings.ignoreUnmappedFields) { - throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database"); + throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); } if (desc != null) { @@ -991,6 +1005,10 @@ public class PutDatabaseRecord extends AbstractProcessor { usedColumnNames.add(desc.getColumnName()); } usedColumnIndices.add(i); + } else { + // User is ignoring unmapped fields, but log at debug level just in case + getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); } } } @@ -1029,8 +1047,12 @@ public class PutDatabaseRecord extends AbstractProcessor { final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames)); if (desc == null) { if (!settings.ignoreUnmappedFields) { - throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database"); + throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); } else { + // User is ignoring unmapped fields, but log at debug level just in case + getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); continue; } } @@ -1127,7 +1149,8 @@ public class PutDatabaseRecord extends AbstractProcessor { final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames)); if (desc == null && !settings.ignoreUnmappedFields) { - throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database"); + throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); } if (desc != null) { @@ -1150,12 +1173,16 @@ public class PutDatabaseRecord extends AbstractProcessor { sqlBuilder.append(columnName); sqlBuilder.append(" is null AND ? is null))"); includedColumns.add(i); - + } else { + // User is ignoring unmapped fields, but log at debug level just in case + getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); } } if (fieldsFound.get() == 0) { - throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table"); + throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table\n" + + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); } } @@ -1192,7 +1219,7 @@ public class PutDatabaseRecord extends AbstractProcessor { } if (updateKeyColumnNames.isEmpty()) { - throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified"); + throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' not found or does not have a Primary Key and no Update Keys were specified"); } return updateKeyColumnNames; @@ -1281,7 +1308,7 @@ public class PutDatabaseRecord extends AbstractProcessor { final Set primaryKeyColumns = new HashSet<>(); if (includePrimaryKeys) { - try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, null, tableName)) { + try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, schema, tableName)) { while (pkrs.next()) { final String colName = pkrs.getString("COLUMN_NAME"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index 558eb2f980..d252f5cec8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -48,7 +48,6 @@ import java.sql.SQLException import java.sql.SQLNonTransientConnectionException import java.sql.Statement import java.time.LocalDate -import java.time.ZoneId import java.time.ZoneOffset import java.util.function.Supplier @@ -72,8 +71,11 @@ import static org.mockito.Mockito.verify class TestPutDatabaseRecord { private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," + - " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000)," + - " dt date)" + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)" + private static final String createPersonsSchema1 = "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)" + private static final String createPersonsSchema2 = "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)" private final static String DB_LOCATION = "target/db_pdr" TestRunner runner @@ -214,28 +216,28 @@ class TestPutDatabaseRecord { generateInsert(schema, 'PERSONS', tableSchema, settings) fail('generateInsert should fail with unmatched fields') } catch (SQLDataException e) { - assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage()) + assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage()) } try { generateUpdate(schema, 'PERSONS', null, tableSchema, settings) fail('generateUpdate should fail with unmatched fields') } catch (SQLDataException e) { - assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage()) + assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage()) } try { generateDelete(schema, 'PERSONS', tableSchema, settings) fail('generateDelete should fail with unmatched fields') } catch (SQLDataException e) { - assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage()) + assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage()) } } } @Test void testInsert() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -302,7 +304,7 @@ class TestPutDatabaseRecord { @Test void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -336,7 +338,7 @@ class TestPutDatabaseRecord { @Test void testInsertBatchUpdateExceptionRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -369,8 +371,8 @@ class TestPutDatabaseRecord { } @Test - void testInsertNoTable() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + void testInsertNoTableSpecified() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -392,9 +394,33 @@ class TestPutDatabaseRecord { runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1) } + @Test + void testInsertNoTableExists() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable(createPersons) + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("id", RecordFieldType.INT) + parser.addSchemaField("name", RecordFieldType.STRING) + parser.addSchemaField("code", RecordFieldType.INT) + + parser.addRecord(1, 'rec1', 101) + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS2') + + runner.enqueue(new byte[0]) + runner.run() + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0) + runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1) + } + @Test void testInsertViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -434,7 +460,7 @@ class TestPutDatabaseRecord { @Test void testMultipleInsertsViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -474,7 +500,7 @@ class TestPutDatabaseRecord { @Test void testMultipleInsertsViaSqlStatementTypeBadSQL() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -510,7 +536,7 @@ class TestPutDatabaseRecord { @Test void testInvalidData() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -545,7 +571,7 @@ class TestPutDatabaseRecord { @Test void testIOExceptionOnReadData() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -580,7 +606,7 @@ class TestPutDatabaseRecord { @Test void testSqlStatementTypeNoValue() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -605,7 +631,7 @@ class TestPutDatabaseRecord { @Test void testSqlStatementTypeNoValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -632,7 +658,7 @@ class TestPutDatabaseRecord { @Test void testUpdate() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -675,9 +701,82 @@ class TestPutDatabaseRecord { conn.close() } + @Test + void testUpdateMultipleSchemas() throws InitializationException, ProcessException, SQLException, IOException { + // Manually create and drop the tables and schemas + def conn = dbcp.connection + def stmt = conn.createStatement() + stmt.execute('create schema SCHEMA1') + stmt.execute('create schema SCHEMA2') + stmt.execute(createPersonsSchema1) + stmt.execute(createPersonsSchema2) + + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("id", RecordFieldType.INT) + parser.addSchemaField("name", RecordFieldType.STRING) + parser.addSchemaField("code", RecordFieldType.INT) + + parser.addRecord(1, 'rec1', 201) + parser.addRecord(2, 'rec2', 202) + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE) + runner.setProperty(PutDatabaseRecord.SCHEMA_NAME, "SCHEMA1") + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + + // Set some existing records with different values for name and code + Exception e + ResultSet rs + try { + stmt.execute('''INSERT INTO SCHEMA1.PERSONS VALUES (1,'x1',101,null)''') + stmt.execute('''INSERT INTO SCHEMA2.PERSONS VALUES (2,'x2',102,null)''') + + runner.enqueue(new byte[0]) + runner.run() + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1) + rs = stmt.executeQuery('SELECT * FROM SCHEMA1.PERSONS') + assertTrue(rs.next()) + assertEquals(1, rs.getInt(1)) + assertEquals('rec1', rs.getString(2)) + assertEquals(201, rs.getInt(3)) + assertFalse(rs.next()) + rs = stmt.executeQuery('SELECT * FROM SCHEMA2.PERSONS') + assertTrue(rs.next()) + assertEquals(2, rs.getInt(1)) + // Values should not have been updated + assertEquals('x2', rs.getString(2)) + assertEquals(102, rs.getInt(3)) + assertFalse(rs.next()) + } catch(ex) { + e = ex + } + + // Drop the schemas here so as not to interfere with other tests + stmt.execute("drop table SCHEMA1.PERSONS") + stmt.execute("drop table SCHEMA2.PERSONS") + stmt.execute("drop schema SCHEMA1 RESTRICT") + stmt.execute("drop schema SCHEMA2 RESTRICT") + stmt.close() + + // Don't proceed if there was a problem with the asserts + if(e) throw e + rs = conn.metaData.schemas + List schemas = new ArrayList<>() + while(rs.next()) { + schemas += rs.getString(1) + } + assertFalse(schemas.contains('SCHEMA1')) + assertFalse(schemas.contains('SCHEMA2')) + conn.close() + } + @Test void testUpdateAfterInsert() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -699,7 +798,6 @@ class TestPutDatabaseRecord { runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1) final Connection conn = dbcp.getConnection() Statement stmt = conn.createStatement() - stmt = conn.createStatement() ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS') assertTrue(rs.next()) assertEquals(1, rs.getInt(1)) @@ -737,7 +835,7 @@ class TestPutDatabaseRecord { @Test void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)') + recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)') final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -753,12 +851,12 @@ class TestPutDatabaseRecord { runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0) runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1) MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0) - assertEquals('Table \'PERSONS\' does not have a Primary Key and no Update Keys were specified', flowFile.getAttribute(PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR)) + assertEquals('Table \'PERSONS\' not found or does not have a Primary Key and no Update Keys were specified', flowFile.getAttribute(PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR)) } @Test void testUpdateSpecifyUpdateKeys() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)') + recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)') final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -804,7 +902,7 @@ class TestPutDatabaseRecord { @Test void testDelete() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) Connection conn = dbcp.getConnection() Statement stmt = conn.createStatement() stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)") @@ -848,7 +946,7 @@ class TestPutDatabaseRecord { @Test void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) Connection conn = dbcp.getConnection() Statement stmt = conn.createStatement() stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)") @@ -892,7 +990,7 @@ class TestPutDatabaseRecord { @Test void testRecordPathOptions() { - recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)') + recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)') final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -944,7 +1042,7 @@ class TestPutDatabaseRecord { @Test void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -977,7 +1075,7 @@ class TestPutDatabaseRecord { @Test void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException { - recreateTable("PERSONS", createPersons) + recreateTable(createPersons) final MockRecordParser parser = new MockRecordParser() runner.addControllerService("parser", parser) runner.enableControllerService(parser) @@ -1036,18 +1134,19 @@ class TestPutDatabaseRecord { } } - private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException { + private void recreateTable(String createSQL) throws ProcessException, SQLException { final Connection conn = dbcp.getConnection() final Statement stmt = conn.createStatement() try { - stmt.executeUpdate("drop table " + tableName) + stmt.execute("drop table PERSONS") } catch (SQLException ignore) { // Do nothing, may not have existed } - stmt.executeUpdate(createSQL) + stmt.execute(createSQL) stmt.close() conn.close() } + @Test void testGenerateTableName() throws Exception {