diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 1fae8c723e..4626766400 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -697,6 +697,7 @@ public class PutDatabaseRecord extends AbstractProcessor { final RecordSchema recordSchema = currentRecord.getSchema(); final Map columns = tableSchema.getColumns(); + int deleteIndex = 0; for (int i = 0; i < fieldIndexes.size(); i++) { final int currentFieldIndex = fieldIndexes.get(i); Object currentValue = values[currentFieldIndex]; @@ -762,10 +763,12 @@ public class PutDatabaseRecord extends AbstractProcessor { currentValue = DataTypeUtils.convertDateToLocalTZ((Date) currentValue); } - // If DELETE type, insert the object twice because of the null check (see generateDelete for details) + // If DELETE type, insert the object twice if the column is nullable because of the null check (see generateDelete for details) if (DELETE_TYPE.equalsIgnoreCase(statementType)) { - setParameter(ps, i * 2 + 1, currentValue, fieldSqlType, sqlType); - setParameter(ps, i * 2 + 2, currentValue, fieldSqlType, sqlType); + setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType); + if (column.isNullable()) { + setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType); + } } else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) { final int timesToAddObjects = databaseAdapter.getTimesToAddColumnObjectsForUpsert(); for (int j = 0; j < timesToAddObjects; j++) { @@ -1284,9 +1287,16 @@ public class PutDatabaseRecord extends AbstractProcessor { // (column = ? OR (column is null AND ? is null)) sqlBuilder.append("("); sqlBuilder.append(columnName); - sqlBuilder.append(" = ? OR ("); - sqlBuilder.append(columnName); - sqlBuilder.append(" is null AND ? is null))"); + sqlBuilder.append(" = ?"); + + // Only need null check if the column is nullable, otherwise the row wouldn't exist + if (desc.isNullable()) { + sqlBuilder.append(" OR ("); + sqlBuilder.append(columnName); + sqlBuilder.append(" is null AND ? is null))"); + } else { + sqlBuilder.append(")"); + } includedColumns.add(i); } else { // User is ignoring unmapped fields, but log at debug level just in case @@ -1476,12 +1486,14 @@ public class PutDatabaseRecord extends AbstractProcessor { private final int dataType; private final boolean required; private final Integer columnSize; + private final boolean nullable; - public ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) { + public ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize, final boolean nullable) { this.columnName = columnName; this.dataType = dataType; this.required = required; this.columnSize = columnSize; + this.nullable = nullable; } public int getDataType() { @@ -1500,6 +1512,10 @@ public class PutDatabaseRecord extends AbstractProcessor { return required; } + public boolean isNullable() { + return nullable; + } + public static ColumnDescription from(final ResultSet resultSet) throws SQLException { final ResultSetMetaData md = resultSet.getMetaData(); List columns = new ArrayList<>(); @@ -1524,7 +1540,7 @@ public class PutDatabaseRecord extends AbstractProcessor { final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue); final boolean required = !isNullable && !isAutoIncrement && defaultValue == null; - return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize); + return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize, isNullable); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index d3ad032d52..623e8d15cd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -145,9 +145,9 @@ class TestPutDatabaseRecord { def tableSchema = [ [ - new PutDatabaseRecord.ColumnDescription('id', 4, true, 2), - new PutDatabaseRecord.ColumnDescription('name', 12, true, 255), - new PutDatabaseRecord.ColumnDescription('code', 4, true, 10) + new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false), + new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true), + new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true) ], false, ['id'] as Set, @@ -169,7 +169,7 @@ class TestPutDatabaseRecord { assertEquals('UPDATE PERSONS SET name = ?, code = ? WHERE id = ?', generateUpdate(schema, 'PERSONS', null, tableSchema, settings).sql) - assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))', + assertEquals('DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))', generateDelete(schema, 'PERSONS', tableSchema, settings).sql) } } @@ -193,9 +193,9 @@ class TestPutDatabaseRecord { def tableSchema = [ [ - new PutDatabaseRecord.ColumnDescription('id', 4, true, 2), - new PutDatabaseRecord.ColumnDescription('name', 12, true, 255), - new PutDatabaseRecord.ColumnDescription('code', 4, true, 10) + new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false), + new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true), + new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true) ], false, ['id'] as Set, @@ -1236,9 +1236,9 @@ class TestPutDatabaseRecord { def tableSchema = [ [ - new PutDatabaseRecord.ColumnDescription('id', 4, true, 2), - new PutDatabaseRecord.ColumnDescription('name', 12, true, 255), - new PutDatabaseRecord.ColumnDescription('code', 4, true, 10) + new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false), + new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true), + new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true) ], false, ['id'] as Set,