NIFI-8530: Improved DELETE handling in PutDatabaseRecord for non-nullable columns

This closes #5173

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matthew Burgess 2021-06-21 16:26:41 -04:00 committed by exceptionfactory
parent 1de01e34da
commit e16016b4ab
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 34 additions and 18 deletions

View File

@ -697,6 +697,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final RecordSchema recordSchema = currentRecord.getSchema();
final Map<String, ColumnDescription> columns = tableSchema.getColumns();
int deleteIndex = 0;
for (int i = 0; i < fieldIndexes.size(); i++) {
final int currentFieldIndex = fieldIndexes.get(i);
Object currentValue = values[currentFieldIndex];
@ -762,10 +763,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
currentValue = DataTypeUtils.convertDateToLocalTZ((Date) currentValue);
}
// If DELETE type, insert the object twice because of the null check (see generateDelete for details)
// If DELETE type, insert the object twice if the column is nullable because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
setParameter(ps, i * 2 + 1, currentValue, fieldSqlType, sqlType);
setParameter(ps, i * 2 + 2, currentValue, fieldSqlType, sqlType);
setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType);
if (column.isNullable()) {
setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType);
}
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
final int timesToAddObjects = databaseAdapter.getTimesToAddColumnObjectsForUpsert();
for (int j = 0; j < timesToAddObjects; j++) {
@ -1284,9 +1287,16 @@ public class PutDatabaseRecord extends AbstractProcessor {
// (column = ? OR (column is null AND ? is null))
sqlBuilder.append("(");
sqlBuilder.append(columnName);
sqlBuilder.append(" = ? OR (");
sqlBuilder.append(columnName);
sqlBuilder.append(" is null AND ? is null))");
sqlBuilder.append(" = ?");
// Only need null check if the column is nullable, otherwise the row wouldn't exist
if (desc.isNullable()) {
sqlBuilder.append(" OR (");
sqlBuilder.append(columnName);
sqlBuilder.append(" is null AND ? is null))");
} else {
sqlBuilder.append(")");
}
includedColumns.add(i);
} else {
// User is ignoring unmapped fields, but log at debug level just in case
@ -1476,12 +1486,14 @@ public class PutDatabaseRecord extends AbstractProcessor {
private final int dataType;
private final boolean required;
private final Integer columnSize;
private final boolean nullable;
public ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) {
public ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize, final boolean nullable) {
this.columnName = columnName;
this.dataType = dataType;
this.required = required;
this.columnSize = columnSize;
this.nullable = nullable;
}
public int getDataType() {
@ -1500,6 +1512,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
return required;
}
public boolean isNullable() {
return nullable;
}
public static ColumnDescription from(final ResultSet resultSet) throws SQLException {
final ResultSetMetaData md = resultSet.getMetaData();
List<String> columns = new ArrayList<>();
@ -1524,7 +1540,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
final boolean required = !isNullable && !isAutoIncrement && defaultValue == null;
return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize);
return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize, isNullable);
}
@Override

View File

@ -145,9 +145,9 @@ class TestPutDatabaseRecord {
def tableSchema = [
[
new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
],
false,
['id'] as Set<String>,
@ -169,7 +169,7 @@ class TestPutDatabaseRecord {
assertEquals('UPDATE PERSONS SET name = ?, code = ? WHERE id = ?',
generateUpdate(schema, 'PERSONS', null, tableSchema, settings).sql)
assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
assertEquals('DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
generateDelete(schema, 'PERSONS', tableSchema, settings).sql)
}
}
@ -193,9 +193,9 @@ class TestPutDatabaseRecord {
def tableSchema = [
[
new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
],
false,
['id'] as Set<String>,
@ -1236,9 +1236,9 @@ class TestPutDatabaseRecord {
def tableSchema = [
[
new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
],
false,
['id'] as Set<String>,