NIFI-9607: Honor Update Keys when Quoting Identifiers in PutDatabaseRecord

NIFI-9607: Fixed wrong column name in WHERE clause for generateUpdate

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #5701.
This commit is contained in:
Matthew Burgess 2022-01-21 16:52:11 -05:00 committed by Nathan Gough
parent 4d8c79d7f3
commit 03165ad817
2 changed files with 60 additions and 17 deletions

View File

@ -603,14 +603,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile));
}
// Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
// cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
final boolean includePrimaryKeys = updateKeys == null;
final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
final TableSchema tableSchema = schemaCache.get(schemaKey, key -> {
try {
final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys, log);
final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, updateKeys, log);
getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName);
return schema;
} catch (SQLException e) {
@ -1189,12 +1185,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
}
// Set the WHERE clause based on the Update Key values
sqlBuilder.append(" WHERE ");
AtomicInteger whereFieldCount = new AtomicInteger(0);
for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
@ -1207,14 +1199,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
if (whereFieldCount.getAndIncrement() > 0) {
sqlBuilder.append(" AND ");
} else if (i == 0) {
// Set the WHERE clause based on the Update Key values
sqlBuilder.append(" WHERE ");
}
if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(normalizedColName)
.append(desc.getColumnName())
.append(tableSchema.getQuotedIdentifierString());
} else {
sqlBuilder.append(normalizedColName);
sqlBuilder.append(desc.getColumnName());
}
sqlBuilder.append(" = ?");
includedColumns.add(i);
@ -1363,10 +1358,6 @@ public class PutDatabaseRecord extends AbstractProcessor {
getLogger().warn(missingColMessage);
}
}
// Optionally quote the name before returning
if (settings.escapeColumnNames) {
normalizedKeyColumnName = quoteString + normalizedKeyColumnName + quoteString;
}
normalizedKeyColumnNames.add(normalizedKeyColumnName);
}
@ -1419,7 +1410,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
final boolean translateColumnNames, final boolean includePrimaryKeys, ComponentLog log) throws SQLException {
final boolean translateColumnNames, final String updateKeys, ComponentLog log) throws SQLException {
final DatabaseMetaData dmd = conn.getMetaData();
try (final ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%")) {
@ -1455,7 +1446,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
final Set<String> primaryKeyColumns = new HashSet<>();
if (includePrimaryKeys) {
if (updateKeys == null) {
try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, schema, tableName)) {
while (pkrs.next()) {
@ -1463,6 +1454,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
}
}
} else {
// Parse the Update Keys field and normalize the column names
for (final String updateKey : updateKeys.split(",")) {
primaryKeyColumns.add(normalizeColumnName(updateKey.trim(), translateColumnNames));
}
}
return new TableSchema(cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());

View File

@ -966,6 +966,53 @@ class TestPutDatabaseRecord {
conn.close()
}
@Test
void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable('CREATE TABLE PERSONS ("id" integer, name varchar(100), code integer)')
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("id", RecordFieldType.INT)
parser.addSchemaField("name", RecordFieldType.STRING)
parser.addSchemaField("code", RecordFieldType.INT)
parser.addRecord(1, 'rec1', 201)
parser.addRecord(2, 'rec2', 202)
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
runner.setProperty(PutDatabaseRecord.UPDATE_KEYS, '${updateKey}')
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
runner.setProperty(PutDatabaseRecord.QUOTE_IDENTIFIERS, 'true')
// Set some existing records with different values for name and code
final Connection conn = dbcp.getConnection()
Statement stmt = conn.createStatement()
stmt.execute('''INSERT INTO PERSONS VALUES (1,'x1',101)''')
stmt.execute('''INSERT INTO PERSONS VALUES (2,'x2',102)''')
stmt.close()
runner.enqueue(new byte[0], ['updateKey': 'id'])
runner.run()
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
stmt = conn.createStatement()
final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
assertTrue(rs.next())
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
assertEquals(201, rs.getInt(3))
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
assertEquals('rec2', rs.getString(2))
assertEquals(202, rs.getInt(3))
assertFalse(rs.next())
stmt.close()
conn.close()
}
@Test
void testDelete() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable(createPersons)