NIFI-11858 Configurable Column Name Normalization in PutDatabaseRecord and UpdateDatabaseTable

cleaned and required changes  for https://github.com/apache/nifi/pull/8995

updated the description to reflect uppercase conversion of column name  uppercased to do case-insensitive matching irrespective of strategy

added example for REMOVE_ALL_SPECIAL_CHAR  and PATTERN

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #9382
This commit is contained in:
ravisingh 2024-10-11 17:38:00 -07:00 committed by Matt Burgess
parent 0271e926eb
commit 502572b2f5
14 changed files with 499 additions and 123 deletions

View File

@ -23,6 +23,7 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -47,12 +48,15 @@ import java.util.HashSet;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -79,7 +83,10 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processors.standard.db.ColumnDescription;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.NameNormalizer;
import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
import org.apache.nifi.processors.standard.db.TableSchema;
import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
@ -184,25 +191,26 @@ public class PutDatabaseRecord extends AbstractProcessor {
.build();
static final PropertyDescriptor STATEMENT_TYPE_RECORD_PATH = new Builder()
.name("Statement Type Record Path")
.displayName("Statement Type Record Path")
.description("Specifies a RecordPath to evaluate against each Record in order to determine the Statement Type. The RecordPath should equate to either INSERT, UPDATE, UPSERT, or DELETE. "
+ "(Debezium style operation types are also supported: \"r\" and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for DELETE)")
.required(true)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(NONE)
.dependsOn(STATEMENT_TYPE, USE_RECORD_PATH)
.build();
.name("Statement Type Record Path")
.displayName("Statement Type Record Path")
.description("Specifies a RecordPath to evaluate against each Record in order to determine the Statement Type. The RecordPath should equate to either INSERT, UPDATE, UPSERT, or DELETE. "
+ "(Debezium style operation types are also supported: \"r\" and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for DELETE)")
.required(true)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(NONE)
.dependsOn(STATEMENT_TYPE, USE_RECORD_PATH)
.build();
static final PropertyDescriptor DATA_RECORD_PATH = new Builder()
.name("Data Record Path")
.displayName("Data Record Path")
.description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to" +
" the database instead of sending the entire incoming Record. If not specified, the entire incoming Record will be published to the database.")
.required(false)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(NONE)
.build();
.name("Data Record Path")
.displayName("Data Record Path")
.description("If specified, this property denotes a RecordPath that will be evaluated against each incoming" +
" Record and the Record that results from evaluating the RecordPath will be sent to" +
" the database instead of sending the entire incoming Record. If not specified, the entire incoming Record will be published to the database.")
.required(false)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(NONE)
.build();
static final PropertyDescriptor DBCP_SERVICE = new Builder()
.name("put-db-record-dcbp-service")
@ -277,6 +285,25 @@ public class PutDatabaseRecord extends AbstractProcessor {
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor TRANSLATION_STRATEGY = new PropertyDescriptor.Builder()
.required(true)
.name("Column Name Translation Strategy")
.description("The strategy used to normalize table column name. Column Name will be uppercased to " +
"do case-insensitive matching irrespective of strategy")
.allowableValues(TranslationStrategy.class)
.defaultValue(TranslationStrategy.REMOVE_UNDERSCORE.getValue())
.dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue())
.build();
public static final PropertyDescriptor TRANSLATION_PATTERN = new PropertyDescriptor.Builder()
.required(true)
.name("Column Name Translation Pattern")
.description("Column name will be normalized with this regular expression")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue())
.dependsOn(TRANSLATION_STRATEGY, TranslationStrategy.PATTERN.getValue())
.build();
static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new Builder()
.name("put-db-record-unmatched-field-behavior")
@ -415,14 +442,14 @@ public class PutDatabaseRecord extends AbstractProcessor {
});
DB_TYPE = new Builder()
.name("db-type")
.displayName("Database Type")
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
.allowableValues(dbAdapterValues.toArray(new AllowableValue[0]))
.defaultValue("Generic")
.required(false)
.build();
.name("db-type")
.displayName("Database Type")
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
.allowableValues(dbAdapterValues.toArray(new AllowableValue[0]))
.defaultValue("Generic")
.required(false)
.build();
properties = List.of(
RECORD_READER_FACTORY,
@ -436,6 +463,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
TABLE_NAME,
BINARY_STRING_FORMAT,
TRANSLATE_FIELD_NAMES,
TRANSLATION_STRATEGY,
TRANSLATION_PATTERN,
UNMATCHED_FIELD_BEHAVIOR,
UNMATCHED_COLUMN_BEHAVIOR,
UPDATE_KEYS,
@ -473,12 +502,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
DatabaseAdapter databaseAdapter = dbAdapters.get(validationContext.getProperty(DB_TYPE).getValue());
String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue();
if ((UPSERT_TYPE.equals(statementType) && !databaseAdapter.supportsUpsert())
|| (INSERT_IGNORE_TYPE.equals(statementType) && !databaseAdapter.supportsInsertIgnore())) {
|| (INSERT_IGNORE_TYPE.equals(statementType) && !databaseAdapter.supportsInsertIgnore())) {
validationResults.add(new ValidationResult.Builder()
.subject(STATEMENT_TYPE.getDisplayName())
.valid(false)
.explanation(databaseAdapter.getName() + " does not support " + statementType)
.build()
.subject(STATEMENT_TYPE.getDisplayName())
.valid(false)
.explanation(databaseAdapter.getName() + " does not support " + statementType)
.build()
);
}
@ -495,15 +524,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
if (autoCommit != null && autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) {
final String explanation = format("'%s' must be hard-coded to zero when '%s' is set to 'true'."
+ " Batch size equal to zero executes all statements in a single transaction"
+ " which allows rollback to revert all the flow file's statements together if an error occurs.",
MAX_BATCH_SIZE.getDisplayName(), AUTO_COMMIT.getDisplayName());
final String explanation = format("'%s' must be hard-coded to zero when '%s' is set to 'true'."
+ " Batch size equal to zero executes all statements in a single transaction"
+ " which allows rollback to revert all the flow file's statements together if an error occurs.",
MAX_BATCH_SIZE.getDisplayName(), AUTO_COMMIT.getDisplayName());
validationResults.add(new ValidationResult.Builder()
.subject(MAX_BATCH_SIZE.getDisplayName())
.explanation(explanation)
.build());
validationResults.add(new ValidationResult.Builder()
.subject(MAX_BATCH_SIZE.getDisplayName())
.explanation(explanation)
.build());
}
return validationResults;
@ -692,7 +721,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final String regex = "(?<!\\\\);";
sqlStatements = (sql).split(regex);
} else {
sqlStatements = new String[] {sql};
sqlStatements = new String[]{sql};
}
if (isFirstRecord) {
@ -735,7 +764,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
private void executeDML(final ProcessContext context, final ProcessSession session, final FlowFile flowFile,
final Connection con, final RecordReader recordReader, final String explicitStatementType, final DMLSettings settings)
throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
final ComponentLog log = getLogger();
@ -753,13 +782,16 @@ public class PutDatabaseRecord extends AbstractProcessor {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile));
}
final NameNormalizer normalizer = Optional.of(settings)
.filter(s -> s.translateFieldNames)
.map(s -> NameNormalizerFactory.getNormalizer(s.translationStrategy, s.translationPattern))
.orElse(null);
final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
final TableSchema tableSchema;
try {
tableSchema = schemaCache.get(schemaKey, key -> {
try {
final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, updateKeys, log);
final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, normalizer, updateKeys, log);
getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName);
return schema;
} catch (SQLException e) {
@ -780,7 +812,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
// build the fully qualified table name
final String fqTableName = generateTableName(settings, catalog, schemaName, tableName, tableSchema);
final String fqTableName = generateTableName(settings, catalog, schemaName, tableName, tableSchema);
final Map<String, PreparedSqlAndColumns> preparedSql = new HashMap<>();
int currentBatchSize = 0;
@ -805,15 +837,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
final SqlAndIncludedColumns sqlHolder;
if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings);
sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings, normalizer);
} else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings);
sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer);
} else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateDelete(recordSchema, fqTableName, deleteKeys, tableSchema, settings);
sqlHolder = generateDelete(recordSchema, fqTableName, deleteKeys, tableSchema, settings, normalizer);
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings);
sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer);
} else if (INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateInsertIgnore(recordSchema, fqTableName, updateKeys, tableSchema, settings);
sqlHolder = generateInsertIgnore(recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer);
} else {
throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile));
}
@ -843,7 +875,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
if (ps != lastPreparedStatement && lastPreparedStatement != null) {
batchIndex++;
log.debug("Executing query {} because Statement Type changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
lastPreparedStatement.executeBatch();
session.adjustCounter("Batches Executed", 1, false);
@ -863,7 +895,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final DataType dataType = dataTypes.get(currentFieldIndex);
final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType);
final String fieldName = recordSchema.getField(currentFieldIndex).getFieldName();
String columnName = ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
String columnName = TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer);
int sqlType;
final ColumnDescription column = columns.get(columnName);
@ -952,7 +984,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
if (++currentBatchSize == maxBatchSize) {
batchIndex++;
log.debug("Executing query {} because batch reached max size for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
session.adjustCounter("Batches Executed", 1, false);
ps.executeBatch();
currentBatchSize = 0;
@ -1081,7 +1113,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
if (fieldType != RecordFieldType.RECORD) {
throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type" +
" " + fieldType);
" " + fieldType);
}
}
@ -1185,18 +1217,18 @@ public class PutDatabaseRecord extends AbstractProcessor {
return tableNameBuilder.toString();
}
private Set<String> getNormalizedColumnNames(final RecordSchema schema, final boolean translateFieldNames) {
private Set<String> getNormalizedColumnNames(final RecordSchema schema, final boolean translateFieldNames, NameNormalizer normalizer) {
final Set<String> normalizedFieldNames = new HashSet<>();
if (schema != null) {
schema.getFieldNames().forEach((fieldName) -> normalizedFieldNames.add(ColumnDescription.normalizeColumnName(fieldName, translateFieldNames)));
schema.getFieldNames().forEach((fieldName) -> normalizedFieldNames.add(TableSchema.normalizedName(fieldName, translateFieldNames, normalizer)));
}
return normalizedFieldNames;
}
SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings)
SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings, NameNormalizer normalizer)
throws IllegalArgumentException, SQLException {
checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
checkValuesForRequiredColumns(recordSchema, tableSchema, settings, normalizer);
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("INSERT INTO ");
@ -1214,7 +1246,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames));
final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer));
if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@ -1254,13 +1286,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
SqlAndIncludedColumns generateUpsert(final RecordSchema recordSchema, final String tableName, final String updateKeys,
final TableSchema tableSchema, final DMLSettings settings)
throws IllegalArgumentException, SQLException, MalformedRecordException {
final TableSchema tableSchema, final DMLSettings settings, NameNormalizer normalizer)
throws IllegalArgumentException, SQLException, MalformedRecordException {
checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
checkValuesForRequiredColumns(recordSchema, tableSchema, settings, normalizer);
Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames);
normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, normalizer);
List<String> usedColumnNames = new ArrayList<>();
List<Integer> usedColumnIndices = new ArrayList<>();
@ -1273,7 +1305,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames));
final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer));
if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@ -1308,13 +1340,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
SqlAndIncludedColumns generateInsertIgnore(final RecordSchema recordSchema, final String tableName, final String updateKeys,
final TableSchema tableSchema, final DMLSettings settings)
final TableSchema tableSchema, final DMLSettings settings, NameNormalizer normalizer)
throws IllegalArgumentException, SQLException, MalformedRecordException {
checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
checkValuesForRequiredColumns(recordSchema, tableSchema, settings, normalizer);
Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames);
normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, normalizer);
List<String> usedColumnNames = new ArrayList<>();
List<Integer> usedColumnIndices = new ArrayList<>();
@ -1327,7 +1359,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames));
final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer));
if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@ -1363,11 +1395,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
SqlAndIncludedColumns generateUpdate(final RecordSchema recordSchema, final String tableName, final String updateKeys,
final TableSchema tableSchema, final DMLSettings settings)
final TableSchema tableSchema, final DMLSettings settings, NameNormalizer normalizer)
throws IllegalArgumentException, MalformedRecordException, SQLException {
final Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
final Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames);
final Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, normalizer);
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("UPDATE ");
@ -1386,8 +1418,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
final String normalizedColName = ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames));
final String normalizedColName = TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer);
final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer));
if (desc == null) {
if (!settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
@ -1426,8 +1458,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
String fieldName = field.getFieldName();
boolean firstUpdateKey = true;
final String normalizedColName = ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames));
final String normalizedColName = TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer);
final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer));
if (desc != null) {
// Check if this column is a Update Key. If so, add it to the WHERE clause
@ -1457,12 +1489,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
}
SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName, String deleteKeys, final TableSchema tableSchema, final DMLSettings settings)
SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName, String deleteKeys, final TableSchema tableSchema,
final DMLSettings settings, NameNormalizer normalizer)
throws IllegalArgumentException, MalformedRecordException, SQLDataException {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames, normalizer);
for (final String requiredColName : tableSchema.getRequiredColumnNames()) {
final String normalizedColName = ColumnDescription.normalizeColumnName(requiredColName, settings.translateFieldNames);
final String normalizedColName = TableSchema.normalizedName(requiredColName, settings.translateFieldNames, normalizer);
if (!normalizedFieldNames.contains(normalizedColName)) {
String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
if (settings.failUnmappedColumns) {
@ -1505,7 +1538,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames));
final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer));
if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@ -1554,11 +1587,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
}
private void checkValuesForRequiredColumns(RecordSchema recordSchema, TableSchema tableSchema, DMLSettings settings) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
private void checkValuesForRequiredColumns(RecordSchema recordSchema, TableSchema tableSchema, DMLSettings settings, NameNormalizer normalizer) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames, normalizer);
for (final String requiredColName : tableSchema.getRequiredColumnNames()) {
final String normalizedColName = ColumnDescription.normalizeColumnName(requiredColName, settings.translateFieldNames);
final String normalizedColName = TableSchema.normalizedName(requiredColName, settings.translateFieldNames, normalizer);
if (!normalizedFieldNames.contains(normalizedColName)) {
String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
if (settings.failUnmappedColumns) {
@ -1590,15 +1623,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
return updateKeyColumnNames;
}
private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema recordSchema, String updateKeys, DMLSettings settings, Set<String> updateKeyColumnNames)
private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema recordSchema, String updateKeys, DMLSettings settings, Set<String> updateKeyColumnNames, NameNormalizer normalizer)
throws MalformedRecordException {
// Create a Set of all normalized Update Key names, and ensure that there is a field in the record
// for each of the Update Key fields.
final Set<String> normalizedRecordFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
final Set<String> normalizedRecordFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames, normalizer);
final Set<String> normalizedKeyColumnNames = new HashSet<>();
for (final String updateKeyColumnName : updateKeyColumnNames) {
String normalizedKeyColumnName = ColumnDescription.normalizeColumnName(updateKeyColumnName, settings.translateFieldNames);
String normalizedKeyColumnName = TableSchema.normalizedName(updateKeyColumnName, settings.translateFieldNames, normalizer);
if (!normalizedRecordFieldNames.contains(normalizedKeyColumnName)) {
String missingColMessage = "Record does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + updateKeyColumnName + "'";
@ -1651,7 +1684,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
SchemaKey schemaKey = (SchemaKey) o;
if (catalog != null ? !catalog.equals(schemaKey.catalog) : schemaKey.catalog != null) return false;
if (schemaName != null ? !schemaName.equals(schemaKey.schemaName) : schemaKey.schemaName != null) return false;
if (schemaName != null ? !schemaName.equals(schemaKey.schemaName) : schemaKey.schemaName != null)
return false;
return tableName.equals(schemaKey.tableName);
}
}
@ -1736,6 +1770,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
static class DMLSettings {
private final boolean translateFieldNames;
private final TranslationStrategy translationStrategy;
private final Pattern translationPattern;
private final boolean ignoreUnmappedFields;
// Is the unmatched column behaviour fail or warning?
@ -1750,6 +1786,9 @@ public class PutDatabaseRecord extends AbstractProcessor {
DMLSettings(ProcessContext context) {
translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
translationStrategy = TranslationStrategy.valueOf(context.getProperty(TRANSLATION_STRATEGY).getValue());
final String translationRegex = context.getProperty(TRANSLATION_PATTERN).getValue();
translationPattern = translationRegex == null ? null : Pattern.compile(translationRegex);
ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());

View File

@ -40,8 +40,11 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processors.standard.db.ColumnDescription;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.NameNormalizer;
import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
import org.apache.nifi.processors.standard.db.TableNotFoundException;
import org.apache.nifi.processors.standard.db.TableSchema;
import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@ -71,6 +74,7 @@ import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
@ -177,6 +181,27 @@ public class UpdateDatabaseTable extends AbstractProcessor {
.defaultValue("true")
.build();
public static final PropertyDescriptor TRANSLATION_STRATEGY = new PropertyDescriptor.Builder()
.required(true)
.name("Column Name Translation Strategy")
.description("The strategy used to normalize table column name. Column Name will be uppercased to " +
"do case-insensitive matching irrespective of strategy")
.allowableValues(TranslationStrategy.class)
.defaultValue(TranslationStrategy.REMOVE_UNDERSCORE.getValue())
.dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue())
.build();
public static final PropertyDescriptor TRANSLATION_PATTERN = new PropertyDescriptor.Builder()
.name("Column Name Translation Pattern")
.displayName("Column Name Translation Pattern")
.description("Column name will be normalized with this regular expression")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue())
.dependsOn(TRANSLATION_STRATEGY, TranslationStrategy.PATTERN.getValue())
.build();
static final PropertyDescriptor UPDATE_FIELD_NAMES = new PropertyDescriptor.Builder()
.name("updatedatabasetable-update-field-names")
.displayName("Update Field Names")
@ -282,6 +307,8 @@ public class UpdateDatabaseTable extends AbstractProcessor {
CREATE_TABLE,
PRIMARY_KEY_FIELDS,
TRANSLATE_FIELD_NAMES,
TRANSLATION_STRATEGY,
TRANSLATION_PATTERN,
UPDATE_FIELD_NAMES,
RECORD_WRITER_FACTORY,
QUOTE_TABLE_IDENTIFIER,
@ -371,6 +398,13 @@ public class UpdateDatabaseTable extends AbstractProcessor {
final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final boolean updateFieldNames = context.getProperty(UPDATE_FIELD_NAMES).asBoolean();
final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
final TranslationStrategy translationStrategy = TranslationStrategy.valueOf(context.getProperty(TRANSLATION_STRATEGY).getValue());
final String translationRegex = context.getProperty(TRANSLATION_PATTERN).getValue();
final Pattern translationPattern = translationRegex == null ? null : Pattern.compile(translationRegex);
NameNormalizer normalizer = null;
if (translateFieldNames) {
normalizer = NameNormalizerFactory.getNormalizer(translationStrategy, translationPattern);
}
if (recordWriterFactory == null && updateFieldNames) {
throw new ProcessException("Record Writer must be set if 'Update Field Names' is true");
}
@ -393,7 +427,8 @@ public class UpdateDatabaseTable extends AbstractProcessor {
primaryKeyColumnNames = null;
}
final OutputMetadataHolder outputMetadataHolder = checkAndUpdateTableSchema(connection, databaseAdapter, recordSchema,
catalogName, schemaName, tableName, createIfNotExists, translateFieldNames, updateFieldNames, primaryKeyColumnNames, quoteTableName, quoteColumnNames);
catalogName, schemaName, tableName, createIfNotExists, translateFieldNames, normalizer,
updateFieldNames, primaryKeyColumnNames, quoteTableName, quoteColumnNames);
if (outputMetadataHolder != null) {
// The output schema changed (i.e. field names were updated), so write out the corresponding FlowFile
try {
@ -457,15 +492,16 @@ public class UpdateDatabaseTable extends AbstractProcessor {
private synchronized OutputMetadataHolder checkAndUpdateTableSchema(final Connection conn, final DatabaseAdapter databaseAdapter, final RecordSchema schema,
final String catalogName, final String schemaName, final String tableName,
final boolean createIfNotExists, final boolean translateFieldNames, final boolean updateFieldNames,
final Set<String> primaryKeyColumnNames, final boolean quoteTableName, final boolean quoteColumnNames) throws IOException {
final boolean createIfNotExists, final boolean translateFieldNames, final NameNormalizer normalizer,
final boolean updateFieldNames, final Set<String> primaryKeyColumnNames, final boolean quoteTableName,
final boolean quoteColumnNames) throws IOException {
// Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table
try (final Statement s = conn.createStatement()) {
// Determine whether the table exists
TableSchema tableSchema = null;
try {
tableSchema = TableSchema.from(conn, catalogName, schemaName, tableName, translateFieldNames, null, getLogger());
tableSchema = TableSchema.from(conn, catalogName, schemaName, tableName, translateFieldNames, normalizer, null, getLogger());
} catch (TableNotFoundException tnfe) {
// Do nothing, the value will be populated if necessary
}
@ -483,7 +519,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
getLogger().debug("Adding column {} to table {}", recordFieldName, tableName);
}
tableSchema = new TableSchema(catalogName, schemaName, tableName, columns, translateFieldNames, primaryKeyColumnNames, databaseAdapter.getColumnQuoteString());
tableSchema = new TableSchema(catalogName, schemaName, tableName, columns, translateFieldNames, normalizer, primaryKeyColumnNames, databaseAdapter.getColumnQuoteString());
final String createTableSql = databaseAdapter.getCreateTableStatement(tableSchema, quoteTableName, quoteColumnNames);
@ -502,7 +538,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
final List<String> dbColumns = new ArrayList<>();
for (final ColumnDescription columnDescription : tableSchema.getColumnsAsList()) {
dbColumns.add(ColumnDescription.normalizeColumnName(columnDescription.getColumnName(), translateFieldNames));
dbColumns.add(TableSchema.normalizedName(columnDescription.getColumnName(), translateFieldNames, normalizer));
}
final List<ColumnDescription> columnsToAdd = new ArrayList<>();
@ -511,7 +547,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
// Handle new columns
for (RecordField recordField : schema.getFields()) {
final String recordFieldName = recordField.getFieldName();
final String normalizedFieldName = ColumnDescription.normalizeColumnName(recordFieldName, translateFieldNames);
final String normalizedFieldName = TableSchema.normalizedName(recordFieldName, translateFieldNames, normalizer);
if (!dbColumns.contains(normalizedFieldName)) {
// The field does not exist in the table, add it
ColumnDescription columnToAdd = new ColumnDescription(recordFieldName, DataTypeUtils.getSQLTypeValue(recordField.getDataType()),

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db;
public interface NameNormalizer {
String getNormalizedName(String colName);
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db;
import org.apache.nifi.processors.standard.db.impl.PatternNormalizer;
import org.apache.nifi.processors.standard.db.impl.RemoveAllSpecialCharNormalizer;
import org.apache.nifi.processors.standard.db.impl.RemoveSpaceNormalizer;
import org.apache.nifi.processors.standard.db.impl.RemoveUnderscoreAndSpaceNormalizer;
import org.apache.nifi.processors.standard.db.impl.RemoveUnderscoreNormalizer;
import java.util.regex.Pattern;
public class NameNormalizerFactory {
public static NameNormalizer getNormalizer(TranslationStrategy strategy, Pattern regex) {
return switch (strategy) {
case REMOVE_UNDERSCORE -> new RemoveUnderscoreNormalizer();
case REMOVE_SPACE -> new RemoveSpaceNormalizer();
case REMOVE_UNDERSCORE_AND_SPACE -> new RemoveUnderscoreAndSpaceNormalizer();
case REMOVE_ALL_SPECIAL_CHAR -> new RemoveAllSpecialCharNormalizer();
case PATTERN -> new PatternNormalizer(regex);
};
}
}

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
public class TableSchema {
private final List<String> requiredColumnNames;
private final Set<String> primaryKeyColumnNames;
@ -38,18 +39,20 @@ public class TableSchema {
private final String schemaName;
private final String tableName;
public TableSchema(final String catalogName, final String schemaName, final String tableName, final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final Set<String> primaryKeyColumnNames, final String quotedIdentifierString) {
public TableSchema(final String catalogName, final String schemaName, final String tableName,
final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final NameNormalizer normalizer,
final Set<String> primaryKeyColumnNames, final String quotedIdentifierString) {
this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = tableName;
this.columns = new LinkedHashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
this.quotedIdentifierString = quotedIdentifierString;
this.requiredColumnNames = new ArrayList<>();
for (final ColumnDescription desc : columnDescriptions) {
columns.put(ColumnDescription.normalizeColumnName(desc.getColumnName(), translateColumnNames), desc);
final String colName = normalizedName(desc.getColumnName(), translateColumnNames, normalizer);
columns.put(colName, desc);
if (desc.isRequired()) {
requiredColumnNames.add(desc.getColumnName());
}
@ -89,7 +92,8 @@ public class TableSchema {
}
public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
final boolean translateColumnNames, final String updateKeys, ComponentLog log) throws SQLException {
final boolean translateColumnNames, final NameNormalizer normalizer,
final String updateKeys, ComponentLog log) throws SQLException {
final DatabaseMetaData dmd = conn.getMetaData();
try (final ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%")) {
@ -136,14 +140,23 @@ public class TableSchema {
} else {
// Parse the Update Keys field and normalize the column names
for (final String updateKey : updateKeys.split(",")) {
primaryKeyColumns.add(ColumnDescription.normalizeColumnName(updateKey.trim(), translateColumnNames));
final String colName = normalizedName(updateKey, translateColumnNames, normalizer);
primaryKeyColumns.add(colName);
}
}
return new TableSchema(catalog, schema, tableName, cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());
return new TableSchema(catalog, schema, tableName, cols, translateColumnNames, normalizer, primaryKeyColumns, dmd.getIdentifierQuoteString());
}
}
public static String normalizedName(final String name, final boolean translateNames, final NameNormalizer normalizer) {
if (translateNames && normalizer != null) {
return normalizer.getNormalizedName(name).trim().toUpperCase();
}
return name;
}
@Override
public String toString() {
return "TableSchema[columns=" + columns.values() + "]";

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db;
import org.apache.nifi.components.DescribedValue;
/**
* Enumeration of supported Database column name Translation Strategies
*/
public enum TranslationStrategy implements DescribedValue {
REMOVE_UNDERSCORE("Remove Underscore",
"Underscores '_' will be removed from column names Ex: 'Pics_1_23' becomes 'PICS123'"),
REMOVE_SPACE("Remove Space",
"Spaces will be removed from column names Ex. 'User Name' becomes 'USERNAME'"),
REMOVE_UNDERSCORE_AND_SPACE("Remove Underscores and Spaces",
"Spaces and Underscores will be removed from column names Ex. 'User_1 Name' becomes 'USER1NAME'"),
REMOVE_ALL_SPECIAL_CHAR("Remove Regular Expression Characters",
"Remove Regular Expression Characters " +
"Ex. 'user-id' becomes USERID ,total(estimated) become TOTALESTIMATED"),
PATTERN("Regular Expression",
"Remove characters matching this Regular Expression from the column names Ex." +
"1. '\\d' will Remove all numbers " +
"2. '[^a-zA-Z0-9_]' will remove special characters except underscore");
private final String displayName;
private final String description;
TranslationStrategy(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db.impl;
import org.apache.nifi.processors.standard.db.NameNormalizer;
import java.util.regex.Pattern;
public class PatternNormalizer implements NameNormalizer {
private final Pattern pattern;
public PatternNormalizer(Pattern pattern) {
this.pattern = pattern;
}
@Override
public String getNormalizedName(String colName) {
if (pattern == null) {
return colName;
} else {
return pattern.matcher(colName).replaceAll("");
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db.impl;
import org.apache.nifi.processors.standard.db.NameNormalizer;
import java.util.regex.Pattern;
public class RemoveAllSpecialCharNormalizer implements NameNormalizer {
private static final Pattern REMOVE_ALL_SPECIAL_CHAR_REGEX = Pattern.compile("[^a-zA-Z0-9]");
@Override
public String getNormalizedName(String colName) {
return REMOVE_ALL_SPECIAL_CHAR_REGEX.matcher(colName).replaceAll("");
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db.impl;
import org.apache.nifi.processors.standard.db.NameNormalizer;
public class RemoveSpaceNormalizer implements NameNormalizer {
@Override
public String getNormalizedName(String colName) {
return colName.replace(" ", "");
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db.impl;
import org.apache.nifi.processors.standard.db.NameNormalizer;
public class RemoveUnderscoreAndSpaceNormalizer implements NameNormalizer {
@Override
public String getNormalizedName(String colName) {
return colName.replace("_", "").replace(" ", "");
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db.impl;
import org.apache.nifi.processors.standard.db.NameNormalizer;
public class RemoveUnderscoreNormalizer implements NameNormalizer {
@Override
public String getNormalizedName(String colName) {
return colName.replace("_", "");
}
}

View File

@ -22,6 +22,7 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processors.standard.db.ColumnDescription;
import org.apache.nifi.processors.standard.db.NameNormalizer;
import org.apache.nifi.processors.standard.db.TableSchema;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.MalformedRecordException;
@ -388,7 +389,7 @@ public class PutDatabaseRecordTest {
new ColumnDescription("name", 12, true, 255, true),
new ColumnDescription("code", 4, true, 10, true)
),
false,
false, null,
new HashSet<>(Arrays.asList("id")),
""
);
@ -401,13 +402,13 @@ public class PutDatabaseRecordTest {
final PutDatabaseRecord.DMLSettings settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext());
assertEquals("INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)",
processor.generateInsert(schema, "PERSONS", tableSchema, settings).getSql());
processor.generateInsert(schema, "PERSONS", tableSchema, settings, null).getSql());
assertEquals("UPDATE PERSONS SET name = ?, code = ? WHERE id = ?",
processor.generateUpdate(schema, "PERSONS", null, tableSchema, settings).getSql());
processor.generateUpdate(schema, "PERSONS", null, tableSchema, settings, null).getSql());
assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))",
processor.generateDelete(schema, "PERSONS", null, tableSchema, settings).getSql());
processor.generateDelete(schema, "PERSONS", null, tableSchema, settings, null).getSql());
assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (code = ? OR (code is null AND ? is null))",
processor.generateDelete(schema, "PERSONS", "id, code", tableSchema, settings).getSql());
processor.generateDelete(schema, "PERSONS", "id, code", tableSchema, settings, null).getSql());
}
@Test
@ -429,7 +430,7 @@ public class PutDatabaseRecordTest {
new ColumnDescription("name", 12, true, 255, true),
new ColumnDescription("code", 4, true, 10, true)
),
false,
false, null,
new HashSet<>(Arrays.asList("id")),
""
);
@ -442,17 +443,17 @@ public class PutDatabaseRecordTest {
final PutDatabaseRecord.DMLSettings settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext());
SQLDataException e = assertThrows(SQLDataException.class,
() -> processor.generateInsert(schema, "PERSONS", tableSchema, settings),
() -> processor.generateInsert(schema, "PERSONS", tableSchema, settings, null),
"generateInsert should fail with unmatched fields");
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage());
e = assertThrows(SQLDataException.class,
() -> processor.generateUpdate(schema, "PERSONS", null, tableSchema, settings),
() -> processor.generateUpdate(schema, "PERSONS", null, tableSchema, settings, null),
"generateUpdate should fail with unmatched fields");
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage());
e = assertThrows(SQLDataException.class,
() -> processor.generateDelete(schema, "PERSONS", null, tableSchema, settings),
() -> processor.generateDelete(schema, "PERSONS", null, tableSchema, settings, null),
"generateDelete should fail with unmatched fields");
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage());
}
@ -725,7 +726,7 @@ public class PutDatabaseRecordTest {
public void testInsertViaSqlTypeOneStatement(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
String[] sqlStatements = new String[] {
String[] sqlStatements = new String[]{
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
};
testInsertViaSqlTypeStatements(sqlStatements, false);
@ -736,7 +737,7 @@ public class PutDatabaseRecordTest {
public void testInsertViaSqlTypeTwoStatementsSemicolon(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
String[] sqlStatements = new String[] {
String[] sqlStatements = new String[]{
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
"INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
};
@ -748,7 +749,7 @@ public class PutDatabaseRecordTest {
public void testInsertViaSqlTypeThreeStatements(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
String[] sqlStatements = new String[] {
String[] sqlStatements = new String[]{
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
"INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)",
"UPDATE PERSONS SET code = 101 WHERE id = 1"
@ -824,7 +825,7 @@ public class PutDatabaseRecordTest {
public void testMultipleInsertsForOneStatementViaSqlStatementType(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
String[] sqlStatements = new String[] {
String[] sqlStatements = new String[]{
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
};
testMultipleStatementsViaSqlStatementType(sqlStatements);
@ -835,7 +836,7 @@ public class PutDatabaseRecordTest {
public void testMultipleInsertsForTwoStatementsViaSqlStatementType(TestCase testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
String[] sqlStatements = new String[] {
String[] sqlStatements = new String[]{
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
"INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
};
@ -1731,7 +1732,7 @@ public class PutDatabaseRecordTest {
new ColumnDescription("name", 12, true, 255, true),
new ColumnDescription("code", 4, true, 10, true)
),
false,
false, null,
new HashSet<>(Arrays.asList("id")),
""
);
@ -2035,7 +2036,7 @@ public class PutDatabaseRecordTest {
assertEquals(1, resultSet.getInt(1));
Blob blob = resultSet.getBlob(2);
assertArrayEquals(new byte[] {(byte) 171, (byte) 205, (byte) 239}, blob.getBytes(1, (int) blob.length()));
assertArrayEquals(new byte[]{(byte) 171, (byte) 205, (byte) 239}, blob.getBytes(1, (int) blob.length()));
stmt.close();
conn.close();
@ -2201,7 +2202,7 @@ public class PutDatabaseRecordTest {
parser.addSchemaField("code", RecordFieldType.INT);
parser.addSchemaField("content", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()).getFieldType());
parser.addRecord(1, "rec1", 101, new Integer[] {1, 2, 3});
parser.addRecord(1, "rec1", 101, new Integer[]{1, 2, 3});
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
@ -2320,8 +2321,8 @@ public class PutDatabaseRecordTest {
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()).getFieldType());
byte[] longVarBinaryValue1 = new byte[] {97, 98, 99};
byte[] longVarBinaryValue2 = new byte[] {100, 101, 102};
byte[] longVarBinaryValue1 = new byte[]{97, 98, 99};
byte[] longVarBinaryValue2 = new byte[]{100, 101, 102};
parser.addRecord(1, longVarBinaryValue1);
parser.addRecord(2, longVarBinaryValue2);
@ -2421,7 +2422,7 @@ public class PutDatabaseRecordTest {
static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
@Override
SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException {
SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings, NameNormalizer normalizer) throws IllegalArgumentException {
return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES (?,?,?,?)", Arrays.asList(0, 1, 2, 3));
}
}
@ -2459,7 +2460,7 @@ public class PutDatabaseRecordTest {
@Override
public Connection getConnection() throws ProcessException {
try {
Connection spyConnection = spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true"));
Connection spyConnection = spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true"));
doThrow(SQLFeatureNotSupportedException.class).when(spyConnection).setAutoCommit(false);
return spyConnection;
} catch (final Exception e) {
@ -2474,6 +2475,7 @@ public class PutDatabaseRecordTest {
this.rollbackOnFailure = rollbackOnFailure;
this.batchSize = batchSize;
}
private Boolean autoCommit = null;
private Boolean rollbackOnFailure = null;
private Integer batchSize = null;

View File

@ -23,8 +23,11 @@ import java.util.Collections;
import java.util.List;
import org.apache.nifi.processors.standard.db.ColumnDescription;
import org.apache.nifi.processors.standard.db.NameNormalizer;
import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.TableSchema;
import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -144,14 +147,15 @@ public class TestOracle12DatabaseAdapter {
Collection<String> uniqueKeyColumnNames = Arrays.asList("column1", "column4");
String expected = "MERGE INTO table USING (SELECT ? column1, ? column2, ? column3, ? column_4 FROM DUAL) n" +
" ON (table.column1 = n.column1 AND table.column_4 = n.column_4) WHEN NOT MATCHED THEN" +
" INSERT (column1, column2, column3, column_4) VALUES (n.column1, n.column2, n.column3, n.column_4)" +
" WHEN MATCHED THEN UPDATE SET table.column2 = n.column2, table.column3 = n.column3";
" ON (table.column1 = n.column1 AND table.column_4 = n.column_4) WHEN NOT MATCHED THEN" +
" INSERT (column1, column2, column3, column_4) VALUES (n.column1, n.column2, n.column3, n.column_4)" +
" WHEN MATCHED THEN UPDATE SET table.column2 = n.column2, table.column3 = n.column3";
// WHEN
// THEN
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected);
}
@Test
public void testGetCreateTableStatement() {
assertTrue(db.supportsCreateTableIfNotExists());
@ -159,7 +163,9 @@ public class TestOracle12DatabaseAdapter {
new ColumnDescription("col1", Types.INTEGER, true, 4, false),
new ColumnDescription("col2", Types.VARCHAR, false, 2000, true)
);
TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", columns, true, Collections.singleton("COL1"), db.getColumnQuoteString());
NameNormalizer normalizer = NameNormalizerFactory.getNormalizer(TranslationStrategy.REMOVE_UNDERSCORE, null);
TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", columns,
true, normalizer, Collections.singleton("COL1"), db.getColumnQuoteString());
String expectedStatement = "DECLARE\n\tsql_stmt long;\nBEGIN\n\tsql_stmt:='CREATE TABLE "
// Strings are returned as VARCHAR2(2000) regardless of reported size and that VARCHAR2 is not in java.sql.Types

View File

@ -17,8 +17,11 @@
package org.apache.nifi.processors.standard.db.impl;
import org.apache.nifi.processors.standard.db.ColumnDescription;
import org.apache.nifi.processors.standard.db.NameNormalizer;
import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.TableSchema;
import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.junit.jupiter.api.Test;
import java.sql.Types;
@ -118,7 +121,9 @@ public class TestOracleDatabaseAdapter {
new ColumnDescription("col1", Types.INTEGER, true, 4, false),
new ColumnDescription("col2", Types.VARCHAR, false, 2000, true)
);
TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", columns, true, Collections.singleton("COL1"), db.getColumnQuoteString());
NameNormalizer normalizer = NameNormalizerFactory.getNormalizer(TranslationStrategy.REMOVE_UNDERSCORE, null);
TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", columns,
true, normalizer, Collections.singleton("COL1"), db.getColumnQuoteString());
String expectedStatement = "DECLARE\n\tsql_stmt long;\nBEGIN\n\tsql_stmt:='CREATE TABLE "
// Strings are returned as VARCHAR2(2000) regardless of reported size and that VARCHAR2 is not in java.sql.Types