NIFI-8172: Provide schema name to getPrimaryKeys call in PutDatabaseRecord

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4782.
This commit is contained in:
Matthew Burgess 2021-01-29 15:01:13 -05:00 committed by Pierre Villard
parent a9b8635ac9
commit b77dbd5030
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 171 additions and 45 deletions

View File

@ -207,7 +207,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
static final PropertyDescriptor CATALOG_NAME = new Builder() static final PropertyDescriptor CATALOG_NAME = new Builder()
.name("put-db-record-catalog-name") .name("put-db-record-catalog-name")
.displayName("Catalog Name") .displayName("Catalog Name")
.description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty") .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the "
+ "property is set and the database is case-sensitive, the catalog name must match the database's catalog name exactly.")
.required(false) .required(false)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -216,7 +217,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
static final PropertyDescriptor SCHEMA_NAME = new Builder() static final PropertyDescriptor SCHEMA_NAME = new Builder()
.name("put-db-record-schema-name") .name("put-db-record-schema-name")
.displayName("Schema Name") .displayName("Schema Name")
.description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty") .description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the "
+ "property is set and the database is case-sensitive, the schema name must match the database's schema name exactly.")
.required(false) .required(false)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -225,7 +227,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
static final PropertyDescriptor TABLE_NAME = new Builder() static final PropertyDescriptor TABLE_NAME = new Builder()
.name("put-db-record-table-name") .name("put-db-record-table-name")
.displayName("Table Name") .displayName("Table Name")
.description("The name of the table that the statement should affect.") .description("The name of the table that the statement should affect. Note that if the database is case-sensitive, the table name must match the database's table name exactly.")
.required(true) .required(true)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -887,7 +889,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames)); final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
if (desc == null && !settings.ignoreUnmappedFields) { if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database"); throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
} }
if (desc != null) { if (desc != null) {
@ -903,6 +906,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(desc.getColumnName()); sqlBuilder.append(desc.getColumnName());
} }
includedColumns.add(i); includedColumns.add(i);
} else {
// User is ignoring unmapped fields, but log at debug level just in case
getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
} }
} }
@ -912,7 +919,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(")"); sqlBuilder.append(")");
if (fieldsFound.get() == 0) { if (fieldsFound.get() == 0) {
throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table"); throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
} }
} }
return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns); return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
@ -940,7 +948,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames)); final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
if (desc == null && !settings.ignoreUnmappedFields) { if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database"); throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
} }
if (desc != null) { if (desc != null) {
@ -950,6 +959,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
usedColumnNames.add(desc.getColumnName()); usedColumnNames.add(desc.getColumnName());
} }
usedColumnIndices.add(i); usedColumnIndices.add(i);
} else {
// User is ignoring unmapped fields, but log at debug level just in case
getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
} }
} }
} }
@ -981,7 +994,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames)); final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
if (desc == null && !settings.ignoreUnmappedFields) { if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database"); throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
} }
if (desc != null) { if (desc != null) {
@ -991,6 +1005,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
usedColumnNames.add(desc.getColumnName()); usedColumnNames.add(desc.getColumnName());
} }
usedColumnIndices.add(i); usedColumnIndices.add(i);
} else {
// User is ignoring unmapped fields, but log at debug level just in case
getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
} }
} }
} }
@ -1029,8 +1047,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames)); final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
if (desc == null) { if (desc == null) {
if (!settings.ignoreUnmappedFields) { if (!settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database"); throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
} else { } else {
// User is ignoring unmapped fields, but log at debug level just in case
getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
continue; continue;
} }
} }
@ -1127,7 +1149,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames)); final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
if (desc == null && !settings.ignoreUnmappedFields) { if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database"); throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
} }
if (desc != null) { if (desc != null) {
@ -1150,12 +1173,16 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(columnName); sqlBuilder.append(columnName);
sqlBuilder.append(" is null AND ? is null))"); sqlBuilder.append(" is null AND ? is null))");
includedColumns.add(i); includedColumns.add(i);
} else {
// User is ignoring unmapped fields, but log at debug level just in case
getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
} }
} }
if (fieldsFound.get() == 0) { if (fieldsFound.get() == 0) {
throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table"); throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
} }
} }
@ -1192,7 +1219,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
} }
if (updateKeyColumnNames.isEmpty()) { if (updateKeyColumnNames.isEmpty()) {
throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified"); throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' not found or does not have a Primary Key and no Update Keys were specified");
} }
return updateKeyColumnNames; return updateKeyColumnNames;
@ -1281,7 +1308,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final Set<String> primaryKeyColumns = new HashSet<>(); final Set<String> primaryKeyColumns = new HashSet<>();
if (includePrimaryKeys) { if (includePrimaryKeys) {
try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, null, tableName)) { try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, schema, tableName)) {
while (pkrs.next()) { while (pkrs.next()) {
final String colName = pkrs.getString("COLUMN_NAME"); final String colName = pkrs.getString("COLUMN_NAME");

View File

@ -48,7 +48,6 @@ import java.sql.SQLException
import java.sql.SQLNonTransientConnectionException import java.sql.SQLNonTransientConnectionException
import java.sql.Statement import java.sql.Statement
import java.time.LocalDate import java.time.LocalDate
import java.time.ZoneId
import java.time.ZoneOffset import java.time.ZoneOffset
import java.util.function.Supplier import java.util.function.Supplier
@ -72,8 +71,11 @@ import static org.mockito.Mockito.verify
class TestPutDatabaseRecord { class TestPutDatabaseRecord {
private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," + private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," +
" code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000)," + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)"
" dt date)" private static final String createPersonsSchema1 = "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name varchar(100)," +
" code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)"
private static final String createPersonsSchema2 = "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," +
" code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)"
private final static String DB_LOCATION = "target/db_pdr" private final static String DB_LOCATION = "target/db_pdr"
TestRunner runner TestRunner runner
@ -214,28 +216,28 @@ class TestPutDatabaseRecord {
generateInsert(schema, 'PERSONS', tableSchema, settings) generateInsert(schema, 'PERSONS', tableSchema, settings)
fail('generateInsert should fail with unmatched fields') fail('generateInsert should fail with unmatched fields')
} catch (SQLDataException e) { } catch (SQLDataException e) {
assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage()) assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
} }
try { try {
generateUpdate(schema, 'PERSONS', null, tableSchema, settings) generateUpdate(schema, 'PERSONS', null, tableSchema, settings)
fail('generateUpdate should fail with unmatched fields') fail('generateUpdate should fail with unmatched fields')
} catch (SQLDataException e) { } catch (SQLDataException e) {
assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage()) assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
} }
try { try {
generateDelete(schema, 'PERSONS', tableSchema, settings) generateDelete(schema, 'PERSONS', tableSchema, settings)
fail('generateDelete should fail with unmatched fields') fail('generateDelete should fail with unmatched fields')
} catch (SQLDataException e) { } catch (SQLDataException e) {
assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage()) assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
} }
} }
} }
@Test @Test
void testInsert() throws InitializationException, ProcessException, SQLException, IOException { void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -302,7 +304,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException, IOException { void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -336,7 +338,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testInsertBatchUpdateExceptionRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { void testInsertBatchUpdateExceptionRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -369,8 +371,8 @@ class TestPutDatabaseRecord {
} }
@Test @Test
void testInsertNoTable() throws InitializationException, ProcessException, SQLException, IOException { void testInsertNoTableSpecified() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -392,9 +394,33 @@ class TestPutDatabaseRecord {
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1) runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
} }
@Test
void testInsertNoTableExists() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("id", RecordFieldType.INT)
parser.addSchemaField("name", RecordFieldType.STRING)
parser.addSchemaField("code", RecordFieldType.INT)
parser.addRecord(1, 'rec1', 101)
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS2')
runner.enqueue(new byte[0])
runner.run()
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
}
@Test @Test
void testInsertViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException { void testInsertViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -434,7 +460,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testMultipleInsertsViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException { void testMultipleInsertsViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -474,7 +500,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testMultipleInsertsViaSqlStatementTypeBadSQL() throws InitializationException, ProcessException, SQLException, IOException { void testMultipleInsertsViaSqlStatementTypeBadSQL() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -510,7 +536,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testInvalidData() throws InitializationException, ProcessException, SQLException, IOException { void testInvalidData() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -545,7 +571,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testIOExceptionOnReadData() throws InitializationException, ProcessException, SQLException, IOException { void testIOExceptionOnReadData() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -580,7 +606,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testSqlStatementTypeNoValue() throws InitializationException, ProcessException, SQLException, IOException { void testSqlStatementTypeNoValue() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -605,7 +631,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testSqlStatementTypeNoValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { void testSqlStatementTypeNoValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -632,7 +658,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testUpdate() throws InitializationException, ProcessException, SQLException, IOException { void testUpdate() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -675,9 +701,82 @@ class TestPutDatabaseRecord {
conn.close() conn.close()
} }
@Test
void testUpdateMultipleSchemas() throws InitializationException, ProcessException, SQLException, IOException {
// Manually create and drop the tables and schemas
def conn = dbcp.connection
def stmt = conn.createStatement()
stmt.execute('create schema SCHEMA1')
stmt.execute('create schema SCHEMA2')
stmt.execute(createPersonsSchema1)
stmt.execute(createPersonsSchema2)
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("id", RecordFieldType.INT)
parser.addSchemaField("name", RecordFieldType.STRING)
parser.addSchemaField("code", RecordFieldType.INT)
parser.addRecord(1, 'rec1', 201)
parser.addRecord(2, 'rec2', 202)
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
runner.setProperty(PutDatabaseRecord.SCHEMA_NAME, "SCHEMA1")
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
// Set some existing records with different values for name and code
Exception e
ResultSet rs
try {
stmt.execute('''INSERT INTO SCHEMA1.PERSONS VALUES (1,'x1',101,null)''')
stmt.execute('''INSERT INTO SCHEMA2.PERSONS VALUES (2,'x2',102,null)''')
runner.enqueue(new byte[0])
runner.run()
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
rs = stmt.executeQuery('SELECT * FROM SCHEMA1.PERSONS')
assertTrue(rs.next())
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
assertEquals(201, rs.getInt(3))
assertFalse(rs.next())
rs = stmt.executeQuery('SELECT * FROM SCHEMA2.PERSONS')
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
// Values should not have been updated
assertEquals('x2', rs.getString(2))
assertEquals(102, rs.getInt(3))
assertFalse(rs.next())
} catch(ex) {
e = ex
}
// Drop the schemas here so as not to interfere with other tests
stmt.execute("drop table SCHEMA1.PERSONS")
stmt.execute("drop table SCHEMA2.PERSONS")
stmt.execute("drop schema SCHEMA1 RESTRICT")
stmt.execute("drop schema SCHEMA2 RESTRICT")
stmt.close()
// Don't proceed if there was a problem with the asserts
if(e) throw e
rs = conn.metaData.schemas
List<String> schemas = new ArrayList<>()
while(rs.next()) {
schemas += rs.getString(1)
}
assertFalse(schemas.contains('SCHEMA1'))
assertFalse(schemas.contains('SCHEMA2'))
conn.close()
}
@Test @Test
void testUpdateAfterInsert() throws InitializationException, ProcessException, SQLException, IOException { void testUpdateAfterInsert() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -699,7 +798,6 @@ class TestPutDatabaseRecord {
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1) runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
final Connection conn = dbcp.getConnection() final Connection conn = dbcp.getConnection()
Statement stmt = conn.createStatement() Statement stmt = conn.createStatement()
stmt = conn.createStatement()
ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS') ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
assertTrue(rs.next()) assertTrue(rs.next())
assertEquals(1, rs.getInt(1)) assertEquals(1, rs.getInt(1))
@ -737,7 +835,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException, IOException { void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)') recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -753,12 +851,12 @@ class TestPutDatabaseRecord {
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0) runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1) runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0) MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0)
assertEquals('Table \'PERSONS\' does not have a Primary Key and no Update Keys were specified', flowFile.getAttribute(PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR)) assertEquals('Table \'PERSONS\' not found or does not have a Primary Key and no Update Keys were specified', flowFile.getAttribute(PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR))
} }
@Test @Test
void testUpdateSpecifyUpdateKeys() throws InitializationException, ProcessException, SQLException, IOException { void testUpdateSpecifyUpdateKeys() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)') recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -804,7 +902,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testDelete() throws InitializationException, ProcessException, SQLException, IOException { void testDelete() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
Connection conn = dbcp.getConnection() Connection conn = dbcp.getConnection()
Statement stmt = conn.createStatement() Statement stmt = conn.createStatement()
stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)") stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)")
@ -848,7 +946,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException, IOException { void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
Connection conn = dbcp.getConnection() Connection conn = dbcp.getConnection()
Statement stmt = conn.createStatement() Statement stmt = conn.createStatement()
stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)") stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)")
@ -892,7 +990,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testRecordPathOptions() { void testRecordPathOptions() {
recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)') recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -944,7 +1042,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException { void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -977,7 +1075,7 @@ class TestPutDatabaseRecord {
@Test @Test
void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException { void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons) recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser) runner.addControllerService("parser", parser)
runner.enableControllerService(parser) runner.enableControllerService(parser)
@ -1036,18 +1134,19 @@ class TestPutDatabaseRecord {
} }
} }
private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException { private void recreateTable(String createSQL) throws ProcessException, SQLException {
final Connection conn = dbcp.getConnection() final Connection conn = dbcp.getConnection()
final Statement stmt = conn.createStatement() final Statement stmt = conn.createStatement()
try { try {
stmt.executeUpdate("drop table " + tableName) stmt.execute("drop table PERSONS")
} catch (SQLException ignore) { } catch (SQLException ignore) {
// Do nothing, may not have existed // Do nothing, may not have existed
} }
stmt.executeUpdate(createSQL) stmt.execute(createSQL)
stmt.close() stmt.close()
conn.close() conn.close()
} }
@Test @Test
void testGenerateTableName() throws Exception { void testGenerateTableName() throws Exception {