NIFI-13642 Added Delete Keys Property to PutDatabaseRecord

- Delete Keys property enables targeted deletes for databases that do not support primary keys

This closes #9162

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Rajmund Takacs 2024-08-06 16:48:12 +02:00 committed by exceptionfactory
parent abe41ff649
commit d4344a3140
No known key found for this signature in database
2 changed files with 70 additions and 40 deletions

View File

@ -16,8 +16,43 @@
*/
package org.apache.nifi.processors.standard;
import static java.lang.String.format;
import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.BatchUpdateException;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTransientException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -60,40 +95,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.BatchUpdateException;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTransientException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import static java.lang.String.format;
import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"sql", "record", "jdbc", "put", "database", "update", "insert", "delete"})
@CapabilityDescription("The PutDatabaseRecord processor uses a specified RecordReader to input (possibly multiple) records from an incoming flow file. These records are translated to SQL "
@ -306,6 +307,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
.dependsOn(STATEMENT_TYPE, UPDATE_TYPE, UPSERT_TYPE, SQL_TYPE, USE_ATTR_TYPE, USE_RECORD_PATH)
.build();
static final PropertyDescriptor DELETE_KEYS = new Builder()
.name("Delete Keys")
.description("A comma-separated list of column names that uniquely identifies a row in the database for DELETE statements. "
+ "If the Statement Type is DELETE and this property is not set, the table's columns are used. "
+ "This property is ignored if the Statement Type is not DELETE")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.dependsOn(STATEMENT_TYPE, DELETE_TYPE, SQL_TYPE, USE_ATTR_TYPE, USE_RECORD_PATH)
.build();
static final PropertyDescriptor FIELD_CONTAINING_SQL = new Builder()
.name("put-db-record-field-containing-sql")
.displayName("Field Containing SQL")
@ -427,6 +439,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
UNMATCHED_FIELD_BEHAVIOR,
UNMATCHED_COLUMN_BEHAVIOR,
UPDATE_KEYS,
DELETE_KEYS,
FIELD_CONTAINING_SQL,
ALLOW_MULTIPLE_STATEMENTS,
QUOTE_IDENTIFIERS,
@ -730,6 +743,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
final String deleteKeys = context.getProperty(DELETE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
final int timeoutMillis = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
@ -795,7 +809,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
} else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings);
} else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateDelete(recordSchema, fqTableName, tableSchema, settings);
sqlHolder = generateDelete(recordSchema, fqTableName, deleteKeys, tableSchema, settings);
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings);
} else if (INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) {
@ -1443,7 +1457,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
}
SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings)
SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName, String deleteKeys, final TableSchema tableSchema, final DMLSettings settings)
throws IllegalArgumentException, MalformedRecordException, SQLDataException {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
@ -1465,14 +1479,28 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(tableName);
// iterate over all of the fields in the record, building the SQL statement by adding the column names
List<String> fieldNames = recordSchema.getFieldNames();
final List<String> fieldNames = recordSchema.getFieldNames();
final List<Integer> includedColumns = new ArrayList<>();
if (fieldNames != null) {
sqlBuilder.append(" WHERE ");
int fieldCount = fieldNames.size();
AtomicInteger fieldsFound = new AtomicInteger(0);
// If 'deleteKeys' is not specified by the user, then all columns of the table
// should be used in the 'WHERE' clause, in order to keep the original behavior.
final Set<String> deleteKeysSet;
if (deleteKeys == null) {
deleteKeysSet = new HashSet<>(fieldNames);
} else {
deleteKeysSet = Arrays.stream(deleteKeys.split(","))
.map(String::trim)
.collect(Collectors.toSet());
}
for (int i = 0; i < fieldCount; i++) {
if (!deleteKeysSet.contains(fieldNames.get(i))) {
continue; // skip this field if it should not be included in 'WHERE'
}
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();

View File

@ -405,7 +405,9 @@ public class PutDatabaseRecordTest {
assertEquals("UPDATE PERSONS SET name = ?, code = ? WHERE id = ?",
processor.generateUpdate(schema, "PERSONS", null, tableSchema, settings).getSql());
assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))",
processor.generateDelete(schema, "PERSONS", tableSchema, settings).getSql());
processor.generateDelete(schema, "PERSONS", null, tableSchema, settings).getSql());
assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (code = ? OR (code is null AND ? is null))",
processor.generateDelete(schema, "PERSONS", "id, code", tableSchema, settings).getSql());
}
@Test
@ -450,7 +452,7 @@ public class PutDatabaseRecordTest {
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage());
e = assertThrows(SQLDataException.class,
() -> processor.generateDelete(schema, "PERSONS", tableSchema, settings),
() -> processor.generateDelete(schema, "PERSONS", null, tableSchema, settings),
"generateDelete should fail with unmatched fields");
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage());
}
@ -2495,4 +2497,4 @@ public class PutDatabaseRecordTest {
"; batchSize=" + String.valueOf(batchSize);
}
}
}
}