diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 0f65014bc5..a591122bfd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -646,12 +646,24 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { if (values != null) { if (fieldIndexes != null) { for (int i = 0; i < fieldIndexes.size(); i++) { - ps.setObject(i + 1, values[fieldIndexes.get(i)]); + // If DELETE type, insert the object twice because of the null check (see generateDelete for details) + if (DELETE_TYPE.equalsIgnoreCase(statementType)) { + ps.setObject(i * 2 + 1, values[fieldIndexes.get(i)]); + ps.setObject(i * 2 + 2, values[fieldIndexes.get(i)]); + } else { + ps.setObject(i + 1, values[fieldIndexes.get(i)]); + } } } else { // If there's no index map, assume all values are included and set them in order for (int i = 0; i < values.length; i++) { - ps.setObject(i + 1, values[i]); + // If DELETE type, insert the object twice because of the null check (see generateDelete for details) + if (DELETE_TYPE.equalsIgnoreCase(statementType)) { + ps.setObject(i * 2 + 1, values[i]); + ps.setObject(i * 2 + 2, values[i]); + } else { + ps.setObject(i + 1, values[i]); + } } } ps.addBatch(); @@ -935,14 +947,20 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { sqlBuilder.append(" AND "); } + String columnName; if (settings.escapeColumnNames) { - sqlBuilder.append(tableSchema.getQuotedIdentifierString()) - .append(desc.getColumnName()) - .append(tableSchema.getQuotedIdentifierString()); + columnName = tableSchema.getQuotedIdentifierString() + desc.getColumnName() + tableSchema.getQuotedIdentifierString(); } else { - sqlBuilder.append(desc.getColumnName()); + columnName = desc.getColumnName(); } - sqlBuilder.append(" = ?"); + // Need to build a null-safe construct for the WHERE clause, since we are using PreparedStatement and won't know if the values are null. If they are null, + // then the filter should be "column IS null" vs "column = null". Since we don't know whether the value is null, we can use the following construct (from NIFI-3742): + // (column = ? OR (column is null AND ? is null)) + sqlBuilder.append("("); + sqlBuilder.append(columnName); + sqlBuilder.append(" = ? OR ("); + sqlBuilder.append(columnName); + sqlBuilder.append(" is null AND ? is null))"); includedColumns.add(i); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index e99f43ab9f..355f19275c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -144,7 +144,7 @@ class TestPutDatabaseRecord { assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)', generateInsert(schema, 'PERSONS', tableSchema, settings).sql) - assertEquals('DELETE FROM PERSONS WHERE id = ? AND name = ? AND code = ?', + assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))', generateDelete(schema, 'PERSONS', tableSchema, settings).sql) } } @@ -608,6 +608,51 @@ class TestPutDatabaseRecord { conn.close() } + @Test + void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable("PERSONS", createPersons) + Connection conn = dbcp.getConnection() + Statement stmt = conn.createStatement() + stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101)") + stmt.execute("INSERT INTO PERSONS VALUES (2,'rec2', null)") + stmt.execute("INSERT INTO PERSONS VALUES (3,'rec3', 103)") + stmt.close() + + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("id", RecordFieldType.INT) + parser.addSchemaField("name", RecordFieldType.STRING) + parser.addSchemaField("code", RecordFieldType.INT) + + parser.addRecord(2, 'rec2', null) + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.DELETE_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + + runner.enqueue(new byte[0]) + runner.run() + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1) + stmt = conn.createStatement() + final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS') + assertTrue(rs.next()) + assertEquals(1, rs.getInt(1)) + assertEquals('rec1', rs.getString(2)) + assertEquals(101, rs.getInt(3)) + assertTrue(rs.next()) + assertEquals(3, rs.getInt(1)) + assertEquals('rec3', rs.getString(2)) + assertEquals(103, rs.getInt(3)) + assertFalse(rs.next()) + + stmt.close() + conn.close() + } + + private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException { final Connection conn = dbcp.getConnection() final Statement stmt = conn.createStatement()