NIFI-3742: Fixed handling of nulls in DELETE for PutDatabaseRecord

This closes #1709.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Matt Burgess 2017-04-27 11:36:47 -04:00 committed by Koji Kawamura
parent 8651d79778
commit d61f519326
2 changed files with 71 additions and 8 deletions

View File

@ -646,12 +646,24 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
if (values != null) {
if (fieldIndexes != null) {
for (int i = 0; i < fieldIndexes.size(); i++) {
ps.setObject(i + 1, values[fieldIndexes.get(i)]);
// If DELETE type, insert the object twice because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
ps.setObject(i * 2 + 1, values[fieldIndexes.get(i)]);
ps.setObject(i * 2 + 2, values[fieldIndexes.get(i)]);
} else {
ps.setObject(i + 1, values[fieldIndexes.get(i)]);
}
}
} else {
// If there's no index map, assume all values are included and set them in order
for (int i = 0; i < values.length; i++) {
ps.setObject(i + 1, values[i]);
// If DELETE type, insert the object twice because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
ps.setObject(i * 2 + 1, values[i]);
ps.setObject(i * 2 + 2, values[i]);
} else {
ps.setObject(i + 1, values[i]);
}
}
}
ps.addBatch();
@ -935,14 +947,20 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
sqlBuilder.append(" AND ");
}
String columnName;
if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(desc.getColumnName())
.append(tableSchema.getQuotedIdentifierString());
columnName = tableSchema.getQuotedIdentifierString() + desc.getColumnName() + tableSchema.getQuotedIdentifierString();
} else {
sqlBuilder.append(desc.getColumnName());
columnName = desc.getColumnName();
}
sqlBuilder.append(" = ?");
// Need to build a null-safe construct for the WHERE clause, since we are using PreparedStatement and won't know if the values are null. If they are null,
// then the filter should be "column IS null" vs "column = null". Since we don't know whether the value is null, we can use the following construct (from NIFI-3742):
// (column = ? OR (column is null AND ? is null))
sqlBuilder.append("(");
sqlBuilder.append(columnName);
sqlBuilder.append(" = ? OR (");
sqlBuilder.append(columnName);
sqlBuilder.append(" is null AND ? is null))");
includedColumns.add(i);
}

View File

@ -144,7 +144,7 @@ class TestPutDatabaseRecord {
assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)',
generateInsert(schema, 'PERSONS', tableSchema, settings).sql)
assertEquals('DELETE FROM PERSONS WHERE id = ? AND name = ? AND code = ?',
assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
generateDelete(schema, 'PERSONS', tableSchema, settings).sql)
}
}
@ -608,6 +608,51 @@ class TestPutDatabaseRecord {
conn.close()
}
@Test
void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons)
Connection conn = dbcp.getConnection()
Statement stmt = conn.createStatement()
stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101)")
stmt.execute("INSERT INTO PERSONS VALUES (2,'rec2', null)")
stmt.execute("INSERT INTO PERSONS VALUES (3,'rec3', 103)")
stmt.close()
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("id", RecordFieldType.INT)
parser.addSchemaField("name", RecordFieldType.STRING)
parser.addSchemaField("code", RecordFieldType.INT)
parser.addRecord(2, 'rec2', null)
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.DELETE_TYPE)
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
runner.enqueue(new byte[0])
runner.run()
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
stmt = conn.createStatement()
final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
assertTrue(rs.next())
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
assertEquals(101, rs.getInt(3))
assertTrue(rs.next())
assertEquals(3, rs.getInt(1))
assertEquals('rec3', rs.getString(2))
assertEquals(103, rs.getInt(3))
assertFalse(rs.next())
stmt.close()
conn.close()
}
private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException {
final Connection conn = dbcp.getConnection()
final Statement stmt = conn.createStatement()