diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index a3cfd0c375..19192e57df 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -23,6 +23,7 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -47,12 +48,15 @@ import java.util.HashSet; import java.util.HexFormat; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.ServiceLoader; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.regex.Pattern; import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -79,7 +83,10 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.processors.standard.db.ColumnDescription; import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.processors.standard.db.NameNormalizer; +import org.apache.nifi.processors.standard.db.NameNormalizerFactory; import org.apache.nifi.processors.standard.db.TableSchema; +import org.apache.nifi.processors.standard.db.TranslationStrategy; import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.RecordPath; import org.apache.nifi.record.path.RecordPathResult; @@ -184,25 +191,26 @@ public class PutDatabaseRecord extends AbstractProcessor { .build(); static final PropertyDescriptor STATEMENT_TYPE_RECORD_PATH = new Builder() - .name("Statement Type Record Path") - .displayName("Statement Type Record Path") - .description("Specifies a RecordPath to evaluate against each Record in order to determine the Statement Type. The RecordPath should equate to either INSERT, UPDATE, UPSERT, or DELETE. " - + "(Debezium style operation types are also supported: \"r\" and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for DELETE)") - .required(true) - .addValidator(new RecordPathValidator()) - .expressionLanguageSupported(NONE) - .dependsOn(STATEMENT_TYPE, USE_RECORD_PATH) - .build(); + .name("Statement Type Record Path") + .displayName("Statement Type Record Path") + .description("Specifies a RecordPath to evaluate against each Record in order to determine the Statement Type. The RecordPath should equate to either INSERT, UPDATE, UPSERT, or DELETE. " + + "(Debezium style operation types are also supported: \"r\" and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for DELETE)") + .required(true) + .addValidator(new RecordPathValidator()) + .expressionLanguageSupported(NONE) + .dependsOn(STATEMENT_TYPE, USE_RECORD_PATH) + .build(); static final PropertyDescriptor DATA_RECORD_PATH = new Builder() - .name("Data Record Path") - .displayName("Data Record Path") - .description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to" + - " the database instead of sending the entire incoming Record. If not specified, the entire incoming Record will be published to the database.") - .required(false) - .addValidator(new RecordPathValidator()) - .expressionLanguageSupported(NONE) - .build(); + .name("Data Record Path") + .displayName("Data Record Path") + .description("If specified, this property denotes a RecordPath that will be evaluated against each incoming" + + " Record and the Record that results from evaluating the RecordPath will be sent to" + + " the database instead of sending the entire incoming Record. If not specified, the entire incoming Record will be published to the database.") + .required(false) + .addValidator(new RecordPathValidator()) + .expressionLanguageSupported(NONE) + .build(); static final PropertyDescriptor DBCP_SERVICE = new Builder() .name("put-db-record-dcbp-service") @@ -277,6 +285,25 @@ public class PutDatabaseRecord extends AbstractProcessor { .allowableValues("true", "false") .defaultValue("true") .build(); + public static final PropertyDescriptor TRANSLATION_STRATEGY = new PropertyDescriptor.Builder() + .required(true) + .name("Column Name Translation Strategy") + .description("The strategy used to normalize table column name. Column Name will be uppercased to " + + "do case-insensitive matching irrespective of strategy") + .allowableValues(TranslationStrategy.class) + .defaultValue(TranslationStrategy.REMOVE_UNDERSCORE.getValue()) + .dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue()) + .build(); + + public static final PropertyDescriptor TRANSLATION_PATTERN = new PropertyDescriptor.Builder() + .required(true) + .name("Column Name Translation Pattern") + .description("Column name will be normalized with this regular expression") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue()) + .dependsOn(TRANSLATION_STRATEGY, TranslationStrategy.PATTERN.getValue()) + .build(); static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new Builder() .name("put-db-record-unmatched-field-behavior") @@ -415,14 +442,14 @@ public class PutDatabaseRecord extends AbstractProcessor { }); DB_TYPE = new Builder() - .name("db-type") - .displayName("Database Type") - .description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type " - + "should suffice, but some databases (such as Oracle) require custom SQL clauses. ") - .allowableValues(dbAdapterValues.toArray(new AllowableValue[0])) - .defaultValue("Generic") - .required(false) - .build(); + .name("db-type") + .displayName("Database Type") + .description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type " + + "should suffice, but some databases (such as Oracle) require custom SQL clauses. ") + .allowableValues(dbAdapterValues.toArray(new AllowableValue[0])) + .defaultValue("Generic") + .required(false) + .build(); properties = List.of( RECORD_READER_FACTORY, @@ -436,6 +463,8 @@ public class PutDatabaseRecord extends AbstractProcessor { TABLE_NAME, BINARY_STRING_FORMAT, TRANSLATE_FIELD_NAMES, + TRANSLATION_STRATEGY, + TRANSLATION_PATTERN, UNMATCHED_FIELD_BEHAVIOR, UNMATCHED_COLUMN_BEHAVIOR, UPDATE_KEYS, @@ -473,12 +502,12 @@ public class PutDatabaseRecord extends AbstractProcessor { DatabaseAdapter databaseAdapter = dbAdapters.get(validationContext.getProperty(DB_TYPE).getValue()); String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue(); if ((UPSERT_TYPE.equals(statementType) && !databaseAdapter.supportsUpsert()) - || (INSERT_IGNORE_TYPE.equals(statementType) && !databaseAdapter.supportsInsertIgnore())) { + || (INSERT_IGNORE_TYPE.equals(statementType) && !databaseAdapter.supportsInsertIgnore())) { validationResults.add(new ValidationResult.Builder() - .subject(STATEMENT_TYPE.getDisplayName()) - .valid(false) - .explanation(databaseAdapter.getName() + " does not support " + statementType) - .build() + .subject(STATEMENT_TYPE.getDisplayName()) + .valid(false) + .explanation(databaseAdapter.getName() + " does not support " + statementType) + .build() ); } @@ -495,15 +524,15 @@ public class PutDatabaseRecord extends AbstractProcessor { } if (autoCommit != null && autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) { - final String explanation = format("'%s' must be hard-coded to zero when '%s' is set to 'true'." - + " Batch size equal to zero executes all statements in a single transaction" - + " which allows rollback to revert all the flow file's statements together if an error occurs.", - MAX_BATCH_SIZE.getDisplayName(), AUTO_COMMIT.getDisplayName()); + final String explanation = format("'%s' must be hard-coded to zero when '%s' is set to 'true'." + + " Batch size equal to zero executes all statements in a single transaction" + + " which allows rollback to revert all the flow file's statements together if an error occurs.", + MAX_BATCH_SIZE.getDisplayName(), AUTO_COMMIT.getDisplayName()); - validationResults.add(new ValidationResult.Builder() - .subject(MAX_BATCH_SIZE.getDisplayName()) - .explanation(explanation) - .build()); + validationResults.add(new ValidationResult.Builder() + .subject(MAX_BATCH_SIZE.getDisplayName()) + .explanation(explanation) + .build()); } return validationResults; @@ -692,7 +721,7 @@ public class PutDatabaseRecord extends AbstractProcessor { final String regex = "(? s.translateFieldNames) + .map(s -> NameNormalizerFactory.getNormalizer(s.translationStrategy, s.translationPattern)) + .orElse(null); final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName); final TableSchema tableSchema; try { tableSchema = schemaCache.get(schemaKey, key -> { try { - final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, updateKeys, log); + final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, normalizer, updateKeys, log); getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName); return schema; } catch (SQLException e) { @@ -780,7 +812,7 @@ public class PutDatabaseRecord extends AbstractProcessor { } // build the fully qualified table name - final String fqTableName = generateTableName(settings, catalog, schemaName, tableName, tableSchema); + final String fqTableName = generateTableName(settings, catalog, schemaName, tableName, tableSchema); final Map preparedSql = new HashMap<>(); int currentBatchSize = 0; @@ -805,15 +837,15 @@ public class PutDatabaseRecord extends AbstractProcessor { final SqlAndIncludedColumns sqlHolder; if (INSERT_TYPE.equalsIgnoreCase(statementType)) { - sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings); + sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings, normalizer); } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) { - sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings); + sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer); } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) { - sqlHolder = generateDelete(recordSchema, fqTableName, deleteKeys, tableSchema, settings); + sqlHolder = generateDelete(recordSchema, fqTableName, deleteKeys, tableSchema, settings, normalizer); } else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) { - sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings); + sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer); } else if (INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) { - sqlHolder = generateInsertIgnore(recordSchema, fqTableName, updateKeys, tableSchema, settings); + sqlHolder = generateInsertIgnore(recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer); } else { throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile)); } @@ -843,7 +875,7 @@ public class PutDatabaseRecord extends AbstractProcessor { if (ps != lastPreparedStatement && lastPreparedStatement != null) { batchIndex++; log.debug("Executing query {} because Statement Type changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: {}", - sql, flowFile, fieldIndexes, batchIndex, currentBatchSize); + sql, flowFile, fieldIndexes, batchIndex, currentBatchSize); lastPreparedStatement.executeBatch(); session.adjustCounter("Batches Executed", 1, false); @@ -863,7 +895,7 @@ public class PutDatabaseRecord extends AbstractProcessor { final DataType dataType = dataTypes.get(currentFieldIndex); final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType); final String fieldName = recordSchema.getField(currentFieldIndex).getFieldName(); - String columnName = ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames); + String columnName = TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer); int sqlType; final ColumnDescription column = columns.get(columnName); @@ -952,7 +984,7 @@ public class PutDatabaseRecord extends AbstractProcessor { if (++currentBatchSize == maxBatchSize) { batchIndex++; log.debug("Executing query {} because batch reached max size for {}; fieldIndexes: {}; batch index: {}; batch size: {}", - sql, flowFile, fieldIndexes, batchIndex, currentBatchSize); + sql, flowFile, fieldIndexes, batchIndex, currentBatchSize); session.adjustCounter("Batches Executed", 1, false); ps.executeBatch(); currentBatchSize = 0; @@ -1081,7 +1113,7 @@ public class PutDatabaseRecord extends AbstractProcessor { final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType(); if (fieldType != RecordFieldType.RECORD) { throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type" + - " " + fieldType); + " " + fieldType); } } @@ -1185,18 +1217,18 @@ public class PutDatabaseRecord extends AbstractProcessor { return tableNameBuilder.toString(); } - private Set getNormalizedColumnNames(final RecordSchema schema, final boolean translateFieldNames) { + private Set getNormalizedColumnNames(final RecordSchema schema, final boolean translateFieldNames, NameNormalizer normalizer) { final Set normalizedFieldNames = new HashSet<>(); if (schema != null) { - schema.getFieldNames().forEach((fieldName) -> normalizedFieldNames.add(ColumnDescription.normalizeColumnName(fieldName, translateFieldNames))); + schema.getFieldNames().forEach((fieldName) -> normalizedFieldNames.add(TableSchema.normalizedName(fieldName, translateFieldNames, normalizer))); } return normalizedFieldNames; } - SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings) + SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings, NameNormalizer normalizer) throws IllegalArgumentException, SQLException { - checkValuesForRequiredColumns(recordSchema, tableSchema, settings); + checkValuesForRequiredColumns(recordSchema, tableSchema, settings, normalizer); final StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("INSERT INTO "); @@ -1214,7 +1246,7 @@ public class PutDatabaseRecord extends AbstractProcessor { RecordField field = recordSchema.getField(i); String fieldName = field.getFieldName(); - final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames)); + final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer)); if (desc == null && !settings.ignoreUnmappedFields) { throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n" + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); @@ -1254,13 +1286,13 @@ public class PutDatabaseRecord extends AbstractProcessor { } SqlAndIncludedColumns generateUpsert(final RecordSchema recordSchema, final String tableName, final String updateKeys, - final TableSchema tableSchema, final DMLSettings settings) - throws IllegalArgumentException, SQLException, MalformedRecordException { + final TableSchema tableSchema, final DMLSettings settings, NameNormalizer normalizer) + throws IllegalArgumentException, SQLException, MalformedRecordException { - checkValuesForRequiredColumns(recordSchema, tableSchema, settings); + checkValuesForRequiredColumns(recordSchema, tableSchema, settings, normalizer); Set keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema); - normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames); + normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, normalizer); List usedColumnNames = new ArrayList<>(); List usedColumnIndices = new ArrayList<>(); @@ -1273,7 +1305,7 @@ public class PutDatabaseRecord extends AbstractProcessor { RecordField field = recordSchema.getField(i); String fieldName = field.getFieldName(); - final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames)); + final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer)); if (desc == null && !settings.ignoreUnmappedFields) { throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n" + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); @@ -1308,13 +1340,13 @@ public class PutDatabaseRecord extends AbstractProcessor { } SqlAndIncludedColumns generateInsertIgnore(final RecordSchema recordSchema, final String tableName, final String updateKeys, - final TableSchema tableSchema, final DMLSettings settings) + final TableSchema tableSchema, final DMLSettings settings, NameNormalizer normalizer) throws IllegalArgumentException, SQLException, MalformedRecordException { - checkValuesForRequiredColumns(recordSchema, tableSchema, settings); + checkValuesForRequiredColumns(recordSchema, tableSchema, settings, normalizer); Set keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema); - normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames); + normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, normalizer); List usedColumnNames = new ArrayList<>(); List usedColumnIndices = new ArrayList<>(); @@ -1327,7 +1359,7 @@ public class PutDatabaseRecord extends AbstractProcessor { RecordField field = recordSchema.getField(i); String fieldName = field.getFieldName(); - final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames)); + final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer)); if (desc == null && !settings.ignoreUnmappedFields) { throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n" + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); @@ -1363,11 +1395,11 @@ public class PutDatabaseRecord extends AbstractProcessor { } SqlAndIncludedColumns generateUpdate(final RecordSchema recordSchema, final String tableName, final String updateKeys, - final TableSchema tableSchema, final DMLSettings settings) + final TableSchema tableSchema, final DMLSettings settings, NameNormalizer normalizer) throws IllegalArgumentException, MalformedRecordException, SQLException { final Set keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema); - final Set normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames); + final Set normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, normalizer); final StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("UPDATE "); @@ -1386,8 +1418,8 @@ public class PutDatabaseRecord extends AbstractProcessor { RecordField field = recordSchema.getField(i); String fieldName = field.getFieldName(); - final String normalizedColName = ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames); - final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames)); + final String normalizedColName = TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer); + final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer)); if (desc == null) { if (!settings.ignoreUnmappedFields) { throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n" @@ -1426,8 +1458,8 @@ public class PutDatabaseRecord extends AbstractProcessor { String fieldName = field.getFieldName(); boolean firstUpdateKey = true; - final String normalizedColName = ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames); - final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames)); + final String normalizedColName = TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer); + final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer)); if (desc != null) { // Check if this column is a Update Key. If so, add it to the WHERE clause @@ -1457,12 +1489,13 @@ public class PutDatabaseRecord extends AbstractProcessor { return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns); } - SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName, String deleteKeys, final TableSchema tableSchema, final DMLSettings settings) + SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName, String deleteKeys, final TableSchema tableSchema, + final DMLSettings settings, NameNormalizer normalizer) throws IllegalArgumentException, MalformedRecordException, SQLDataException { - final Set normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames); + final Set normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames, normalizer); for (final String requiredColName : tableSchema.getRequiredColumnNames()) { - final String normalizedColName = ColumnDescription.normalizeColumnName(requiredColName, settings.translateFieldNames); + final String normalizedColName = TableSchema.normalizedName(requiredColName, settings.translateFieldNames, normalizer); if (!normalizedFieldNames.contains(normalizedColName)) { String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'"; if (settings.failUnmappedColumns) { @@ -1505,7 +1538,7 @@ public class PutDatabaseRecord extends AbstractProcessor { RecordField field = recordSchema.getField(i); String fieldName = field.getFieldName(); - final ColumnDescription desc = tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames)); + final ColumnDescription desc = tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer)); if (desc == null && !settings.ignoreUnmappedFields) { throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n" + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet())); @@ -1554,11 +1587,11 @@ public class PutDatabaseRecord extends AbstractProcessor { return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns); } - private void checkValuesForRequiredColumns(RecordSchema recordSchema, TableSchema tableSchema, DMLSettings settings) { - final Set normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames); + private void checkValuesForRequiredColumns(RecordSchema recordSchema, TableSchema tableSchema, DMLSettings settings, NameNormalizer normalizer) { + final Set normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames, normalizer); for (final String requiredColName : tableSchema.getRequiredColumnNames()) { - final String normalizedColName = ColumnDescription.normalizeColumnName(requiredColName, settings.translateFieldNames); + final String normalizedColName = TableSchema.normalizedName(requiredColName, settings.translateFieldNames, normalizer); if (!normalizedFieldNames.contains(normalizedColName)) { String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'"; if (settings.failUnmappedColumns) { @@ -1590,15 +1623,15 @@ public class PutDatabaseRecord extends AbstractProcessor { return updateKeyColumnNames; } - private Set normalizeKeyColumnNamesAndCheckForValues(RecordSchema recordSchema, String updateKeys, DMLSettings settings, Set updateKeyColumnNames) + private Set normalizeKeyColumnNamesAndCheckForValues(RecordSchema recordSchema, String updateKeys, DMLSettings settings, Set updateKeyColumnNames, NameNormalizer normalizer) throws MalformedRecordException { // Create a Set of all normalized Update Key names, and ensure that there is a field in the record // for each of the Update Key fields. - final Set normalizedRecordFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames); + final Set normalizedRecordFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames, normalizer); final Set normalizedKeyColumnNames = new HashSet<>(); for (final String updateKeyColumnName : updateKeyColumnNames) { - String normalizedKeyColumnName = ColumnDescription.normalizeColumnName(updateKeyColumnName, settings.translateFieldNames); + String normalizedKeyColumnName = TableSchema.normalizedName(updateKeyColumnName, settings.translateFieldNames, normalizer); if (!normalizedRecordFieldNames.contains(normalizedKeyColumnName)) { String missingColMessage = "Record does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + updateKeyColumnName + "'"; @@ -1651,7 +1684,8 @@ public class PutDatabaseRecord extends AbstractProcessor { SchemaKey schemaKey = (SchemaKey) o; if (catalog != null ? !catalog.equals(schemaKey.catalog) : schemaKey.catalog != null) return false; - if (schemaName != null ? !schemaName.equals(schemaKey.schemaName) : schemaKey.schemaName != null) return false; + if (schemaName != null ? !schemaName.equals(schemaKey.schemaName) : schemaKey.schemaName != null) + return false; return tableName.equals(schemaKey.tableName); } } @@ -1736,6 +1770,8 @@ public class PutDatabaseRecord extends AbstractProcessor { static class DMLSettings { private final boolean translateFieldNames; + private final TranslationStrategy translationStrategy; + private final Pattern translationPattern; private final boolean ignoreUnmappedFields; // Is the unmatched column behaviour fail or warning? @@ -1750,6 +1786,9 @@ public class PutDatabaseRecord extends AbstractProcessor { DMLSettings(ProcessContext context) { translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean(); + translationStrategy = TranslationStrategy.valueOf(context.getProperty(TRANSLATION_STRATEGY).getValue()); + final String translationRegex = context.getProperty(TRANSLATION_PATTERN).getValue(); + translationPattern = translationRegex == null ? null : Pattern.compile(translationRegex); ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue()); failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue()); diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java index 239ebede08..00b9d1ba52 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java @@ -40,8 +40,11 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.pattern.DiscontinuedException; import org.apache.nifi.processors.standard.db.ColumnDescription; import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.processors.standard.db.NameNormalizer; +import org.apache.nifi.processors.standard.db.NameNormalizerFactory; import org.apache.nifi.processors.standard.db.TableNotFoundException; import org.apache.nifi.processors.standard.db.TableSchema; +import org.apache.nifi.processors.standard.db.TranslationStrategy; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; @@ -71,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; import java.util.Set; +import java.util.regex.Pattern; import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; @@ -177,6 +181,27 @@ public class UpdateDatabaseTable extends AbstractProcessor { .defaultValue("true") .build(); + public static final PropertyDescriptor TRANSLATION_STRATEGY = new PropertyDescriptor.Builder() + .required(true) + .name("Column Name Translation Strategy") + .description("The strategy used to normalize table column name. Column Name will be uppercased to " + + "do case-insensitive matching irrespective of strategy") + .allowableValues(TranslationStrategy.class) + .defaultValue(TranslationStrategy.REMOVE_UNDERSCORE.getValue()) + .dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue()) + .build(); + + public static final PropertyDescriptor TRANSLATION_PATTERN = new PropertyDescriptor.Builder() + .name("Column Name Translation Pattern") + .displayName("Column Name Translation Pattern") + .description("Column name will be normalized with this regular expression") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue()) + .dependsOn(TRANSLATION_STRATEGY, TranslationStrategy.PATTERN.getValue()) + .build(); + static final PropertyDescriptor UPDATE_FIELD_NAMES = new PropertyDescriptor.Builder() .name("updatedatabasetable-update-field-names") .displayName("Update Field Names") @@ -282,6 +307,8 @@ public class UpdateDatabaseTable extends AbstractProcessor { CREATE_TABLE, PRIMARY_KEY_FIELDS, TRANSLATE_FIELD_NAMES, + TRANSLATION_STRATEGY, + TRANSLATION_PATTERN, UPDATE_FIELD_NAMES, RECORD_WRITER_FACTORY, QUOTE_TABLE_IDENTIFIER, @@ -371,6 +398,13 @@ public class UpdateDatabaseTable extends AbstractProcessor { final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue()); final boolean updateFieldNames = context.getProperty(UPDATE_FIELD_NAMES).asBoolean(); final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean(); + final TranslationStrategy translationStrategy = TranslationStrategy.valueOf(context.getProperty(TRANSLATION_STRATEGY).getValue()); + final String translationRegex = context.getProperty(TRANSLATION_PATTERN).getValue(); + final Pattern translationPattern = translationRegex == null ? null : Pattern.compile(translationRegex); + NameNormalizer normalizer = null; + if (translateFieldNames) { + normalizer = NameNormalizerFactory.getNormalizer(translationStrategy, translationPattern); + } if (recordWriterFactory == null && updateFieldNames) { throw new ProcessException("Record Writer must be set if 'Update Field Names' is true"); } @@ -393,7 +427,8 @@ public class UpdateDatabaseTable extends AbstractProcessor { primaryKeyColumnNames = null; } final OutputMetadataHolder outputMetadataHolder = checkAndUpdateTableSchema(connection, databaseAdapter, recordSchema, - catalogName, schemaName, tableName, createIfNotExists, translateFieldNames, updateFieldNames, primaryKeyColumnNames, quoteTableName, quoteColumnNames); + catalogName, schemaName, tableName, createIfNotExists, translateFieldNames, normalizer, + updateFieldNames, primaryKeyColumnNames, quoteTableName, quoteColumnNames); if (outputMetadataHolder != null) { // The output schema changed (i.e. field names were updated), so write out the corresponding FlowFile try { @@ -457,15 +492,16 @@ public class UpdateDatabaseTable extends AbstractProcessor { private synchronized OutputMetadataHolder checkAndUpdateTableSchema(final Connection conn, final DatabaseAdapter databaseAdapter, final RecordSchema schema, final String catalogName, final String schemaName, final String tableName, - final boolean createIfNotExists, final boolean translateFieldNames, final boolean updateFieldNames, - final Set primaryKeyColumnNames, final boolean quoteTableName, final boolean quoteColumnNames) throws IOException { + final boolean createIfNotExists, final boolean translateFieldNames, final NameNormalizer normalizer, + final boolean updateFieldNames, final Set primaryKeyColumnNames, final boolean quoteTableName, + final boolean quoteColumnNames) throws IOException { // Read in the current table metadata, compare it to the reader's schema, and // add any columns from the schema that are missing in the table try (final Statement s = conn.createStatement()) { // Determine whether the table exists TableSchema tableSchema = null; try { - tableSchema = TableSchema.from(conn, catalogName, schemaName, tableName, translateFieldNames, null, getLogger()); + tableSchema = TableSchema.from(conn, catalogName, schemaName, tableName, translateFieldNames, normalizer, null, getLogger()); } catch (TableNotFoundException tnfe) { // Do nothing, the value will be populated if necessary } @@ -483,7 +519,7 @@ public class UpdateDatabaseTable extends AbstractProcessor { getLogger().debug("Adding column {} to table {}", recordFieldName, tableName); } - tableSchema = new TableSchema(catalogName, schemaName, tableName, columns, translateFieldNames, primaryKeyColumnNames, databaseAdapter.getColumnQuoteString()); + tableSchema = new TableSchema(catalogName, schemaName, tableName, columns, translateFieldNames, normalizer, primaryKeyColumnNames, databaseAdapter.getColumnQuoteString()); final String createTableSql = databaseAdapter.getCreateTableStatement(tableSchema, quoteTableName, quoteColumnNames); @@ -502,7 +538,7 @@ public class UpdateDatabaseTable extends AbstractProcessor { final List dbColumns = new ArrayList<>(); for (final ColumnDescription columnDescription : tableSchema.getColumnsAsList()) { - dbColumns.add(ColumnDescription.normalizeColumnName(columnDescription.getColumnName(), translateFieldNames)); + dbColumns.add(TableSchema.normalizedName(columnDescription.getColumnName(), translateFieldNames, normalizer)); } final List columnsToAdd = new ArrayList<>(); @@ -511,7 +547,7 @@ public class UpdateDatabaseTable extends AbstractProcessor { // Handle new columns for (RecordField recordField : schema.getFields()) { final String recordFieldName = recordField.getFieldName(); - final String normalizedFieldName = ColumnDescription.normalizeColumnName(recordFieldName, translateFieldNames); + final String normalizedFieldName = TableSchema.normalizedName(recordFieldName, translateFieldNames, normalizer); if (!dbColumns.contains(normalizedFieldName)) { // The field does not exist in the table, add it ColumnDescription columnToAdd = new ColumnDescription(recordFieldName, DataTypeUtils.getSQLTypeValue(recordField.getDataType()), diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizer.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizer.java new file mode 100644 index 0000000000..42c41fab6d --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizer.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.db; + +public interface NameNormalizer { + + String getNormalizedName(String colName); + +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizerFactory.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizerFactory.java new file mode 100644 index 0000000000..1e84c34375 --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizerFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.db; + +import org.apache.nifi.processors.standard.db.impl.PatternNormalizer; +import org.apache.nifi.processors.standard.db.impl.RemoveAllSpecialCharNormalizer; +import org.apache.nifi.processors.standard.db.impl.RemoveSpaceNormalizer; +import org.apache.nifi.processors.standard.db.impl.RemoveUnderscoreAndSpaceNormalizer; +import org.apache.nifi.processors.standard.db.impl.RemoveUnderscoreNormalizer; + +import java.util.regex.Pattern; + +public class NameNormalizerFactory { + public static NameNormalizer getNormalizer(TranslationStrategy strategy, Pattern regex) { + + return switch (strategy) { + case REMOVE_UNDERSCORE -> new RemoveUnderscoreNormalizer(); + case REMOVE_SPACE -> new RemoveSpaceNormalizer(); + case REMOVE_UNDERSCORE_AND_SPACE -> new RemoveUnderscoreAndSpaceNormalizer(); + case REMOVE_ALL_SPECIAL_CHAR -> new RemoveAllSpecialCharNormalizer(); + case PATTERN -> new PatternNormalizer(regex); + }; + } +} diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java index 88e81e16be..dc7d3f7db2 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; + public class TableSchema { private final List requiredColumnNames; private final Set primaryKeyColumnNames; @@ -38,18 +39,20 @@ public class TableSchema { private final String schemaName; private final String tableName; - public TableSchema(final String catalogName, final String schemaName, final String tableName, final List columnDescriptions, final boolean translateColumnNames, - final Set primaryKeyColumnNames, final String quotedIdentifierString) { + public TableSchema(final String catalogName, final String schemaName, final String tableName, + final List columnDescriptions, final boolean translateColumnNames, + final NameNormalizer normalizer, + final Set primaryKeyColumnNames, final String quotedIdentifierString) { this.catalogName = catalogName; this.schemaName = schemaName; this.tableName = tableName; this.columns = new LinkedHashMap<>(); this.primaryKeyColumnNames = primaryKeyColumnNames; this.quotedIdentifierString = quotedIdentifierString; - this.requiredColumnNames = new ArrayList<>(); for (final ColumnDescription desc : columnDescriptions) { - columns.put(ColumnDescription.normalizeColumnName(desc.getColumnName(), translateColumnNames), desc); + final String colName = normalizedName(desc.getColumnName(), translateColumnNames, normalizer); + columns.put(colName, desc); if (desc.isRequired()) { requiredColumnNames.add(desc.getColumnName()); } @@ -89,7 +92,8 @@ public class TableSchema { } public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName, - final boolean translateColumnNames, final String updateKeys, ComponentLog log) throws SQLException { + final boolean translateColumnNames, final NameNormalizer normalizer, + final String updateKeys, ComponentLog log) throws SQLException { final DatabaseMetaData dmd = conn.getMetaData(); try (final ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%")) { @@ -136,14 +140,23 @@ public class TableSchema { } else { // Parse the Update Keys field and normalize the column names for (final String updateKey : updateKeys.split(",")) { - primaryKeyColumns.add(ColumnDescription.normalizeColumnName(updateKey.trim(), translateColumnNames)); + final String colName = normalizedName(updateKey, translateColumnNames, normalizer); + primaryKeyColumns.add(colName); + } } - return new TableSchema(catalog, schema, tableName, cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString()); + return new TableSchema(catalog, schema, tableName, cols, translateColumnNames, normalizer, primaryKeyColumns, dmd.getIdentifierQuoteString()); } } + public static String normalizedName(final String name, final boolean translateNames, final NameNormalizer normalizer) { + if (translateNames && normalizer != null) { + return normalizer.getNormalizedName(name).trim().toUpperCase(); + } + return name; + } + @Override public String toString() { return "TableSchema[columns=" + columns.values() + "]"; diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TranslationStrategy.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TranslationStrategy.java new file mode 100644 index 0000000000..7e3470a78a --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TranslationStrategy.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.db; + +import org.apache.nifi.components.DescribedValue; + +/** + * Enumeration of supported Database column name Translation Strategies + */ +public enum TranslationStrategy implements DescribedValue { + REMOVE_UNDERSCORE("Remove Underscore", + "Underscores '_' will be removed from column names Ex: 'Pics_1_23' becomes 'PICS123'"), + REMOVE_SPACE("Remove Space", + "Spaces will be removed from column names Ex. 'User Name' becomes 'USERNAME'"), + REMOVE_UNDERSCORE_AND_SPACE("Remove Underscores and Spaces", + "Spaces and Underscores will be removed from column names Ex. 'User_1 Name' becomes 'USER1NAME'"), + REMOVE_ALL_SPECIAL_CHAR("Remove Regular Expression Characters", + "Remove Regular Expression Characters " + + "Ex. 'user-id' becomes USERID ,total(estimated) become TOTALESTIMATED"), + PATTERN("Regular Expression", + "Remove characters matching this Regular Expression from the column names Ex." + + "1. '\\d' will Remove all numbers " + + "2. '[^a-zA-Z0-9_]' will remove special characters except underscore"); + private final String displayName; + private final String description; + + TranslationStrategy(String displayName, String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PatternNormalizer.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PatternNormalizer.java new file mode 100644 index 0000000000..72f01dee5d --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PatternNormalizer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.db.impl; + +import org.apache.nifi.processors.standard.db.NameNormalizer; + +import java.util.regex.Pattern; + +public class PatternNormalizer implements NameNormalizer { + private final Pattern pattern; + + public PatternNormalizer(Pattern pattern) { + this.pattern = pattern; + } + + @Override + public String getNormalizedName(String colName) { + + if (pattern == null) { + return colName; + } else { + return pattern.matcher(colName).replaceAll(""); + } + } + +} diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveAllSpecialCharNormalizer.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveAllSpecialCharNormalizer.java new file mode 100644 index 0000000000..3d9d37700d --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveAllSpecialCharNormalizer.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.db.impl; + +import org.apache.nifi.processors.standard.db.NameNormalizer; + +import java.util.regex.Pattern; + +public class RemoveAllSpecialCharNormalizer implements NameNormalizer { + private static final Pattern REMOVE_ALL_SPECIAL_CHAR_REGEX = Pattern.compile("[^a-zA-Z0-9]"); + @Override + public String getNormalizedName(String colName) { + return REMOVE_ALL_SPECIAL_CHAR_REGEX.matcher(colName).replaceAll(""); + } +} diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveSpaceNormalizer.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveSpaceNormalizer.java new file mode 100644 index 0000000000..9f20b7ee8d --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveSpaceNormalizer.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.db.impl; + +import org.apache.nifi.processors.standard.db.NameNormalizer; + +public class RemoveSpaceNormalizer implements NameNormalizer { + + @Override + public String getNormalizedName(String colName) { + return colName.replace(" ", ""); + } +} diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreAndSpaceNormalizer.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreAndSpaceNormalizer.java new file mode 100644 index 0000000000..454684c07f --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreAndSpaceNormalizer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.db.impl; + +import org.apache.nifi.processors.standard.db.NameNormalizer; + +public class RemoveUnderscoreAndSpaceNormalizer implements NameNormalizer { + + + @Override + public String getNormalizedName(String colName) { + return colName.replace("_", "").replace(" ", ""); + } +} diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreNormalizer.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreNormalizer.java new file mode 100644 index 0000000000..410007ce2d --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreNormalizer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.db.impl; + +import org.apache.nifi.processors.standard.db.NameNormalizer; + +public class RemoveUnderscoreNormalizer implements NameNormalizer { + + + @Override + public String getNormalizedName(String colName) { + return colName.replace("_", ""); + } +} diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index 307c230ced..6b85cc3f74 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -22,6 +22,7 @@ import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.processors.standard.db.ColumnDescription; +import org.apache.nifi.processors.standard.db.NameNormalizer; import org.apache.nifi.processors.standard.db.TableSchema; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.MalformedRecordException; @@ -388,7 +389,7 @@ public class PutDatabaseRecordTest { new ColumnDescription("name", 12, true, 255, true), new ColumnDescription("code", 4, true, 10, true) ), - false, + false, null, new HashSet<>(Arrays.asList("id")), "" ); @@ -401,13 +402,13 @@ public class PutDatabaseRecordTest { final PutDatabaseRecord.DMLSettings settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext()); assertEquals("INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)", - processor.generateInsert(schema, "PERSONS", tableSchema, settings).getSql()); + processor.generateInsert(schema, "PERSONS", tableSchema, settings, null).getSql()); assertEquals("UPDATE PERSONS SET name = ?, code = ? WHERE id = ?", - processor.generateUpdate(schema, "PERSONS", null, tableSchema, settings).getSql()); + processor.generateUpdate(schema, "PERSONS", null, tableSchema, settings, null).getSql()); assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))", - processor.generateDelete(schema, "PERSONS", null, tableSchema, settings).getSql()); + processor.generateDelete(schema, "PERSONS", null, tableSchema, settings, null).getSql()); assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (code = ? OR (code is null AND ? is null))", - processor.generateDelete(schema, "PERSONS", "id, code", tableSchema, settings).getSql()); + processor.generateDelete(schema, "PERSONS", "id, code", tableSchema, settings, null).getSql()); } @Test @@ -429,7 +430,7 @@ public class PutDatabaseRecordTest { new ColumnDescription("name", 12, true, 255, true), new ColumnDescription("code", 4, true, 10, true) ), - false, + false, null, new HashSet<>(Arrays.asList("id")), "" ); @@ -442,17 +443,17 @@ public class PutDatabaseRecordTest { final PutDatabaseRecord.DMLSettings settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext()); SQLDataException e = assertThrows(SQLDataException.class, - () -> processor.generateInsert(schema, "PERSONS", tableSchema, settings), + () -> processor.generateInsert(schema, "PERSONS", tableSchema, settings, null), "generateInsert should fail with unmatched fields"); assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage()); e = assertThrows(SQLDataException.class, - () -> processor.generateUpdate(schema, "PERSONS", null, tableSchema, settings), + () -> processor.generateUpdate(schema, "PERSONS", null, tableSchema, settings, null), "generateUpdate should fail with unmatched fields"); assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage()); e = assertThrows(SQLDataException.class, - () -> processor.generateDelete(schema, "PERSONS", null, tableSchema, settings), + () -> processor.generateDelete(schema, "PERSONS", null, tableSchema, settings, null), "generateDelete should fail with unmatched fields"); assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage()); } @@ -725,7 +726,7 @@ public class PutDatabaseRecordTest { public void testInsertViaSqlTypeOneStatement(TestCase testCase) throws InitializationException, ProcessException, SQLException { setRunner(testCase); - String[] sqlStatements = new String[] { + String[] sqlStatements = new String[]{ "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)" }; testInsertViaSqlTypeStatements(sqlStatements, false); @@ -736,7 +737,7 @@ public class PutDatabaseRecordTest { public void testInsertViaSqlTypeTwoStatementsSemicolon(TestCase testCase) throws InitializationException, ProcessException, SQLException { setRunner(testCase); - String[] sqlStatements = new String[] { + String[] sqlStatements = new String[]{ "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)", "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);" }; @@ -748,7 +749,7 @@ public class PutDatabaseRecordTest { public void testInsertViaSqlTypeThreeStatements(TestCase testCase) throws InitializationException, ProcessException, SQLException { setRunner(testCase); - String[] sqlStatements = new String[] { + String[] sqlStatements = new String[]{ "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)", "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)", "UPDATE PERSONS SET code = 101 WHERE id = 1" @@ -824,7 +825,7 @@ public class PutDatabaseRecordTest { public void testMultipleInsertsForOneStatementViaSqlStatementType(TestCase testCase) throws InitializationException, ProcessException, SQLException { setRunner(testCase); - String[] sqlStatements = new String[] { + String[] sqlStatements = new String[]{ "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)" }; testMultipleStatementsViaSqlStatementType(sqlStatements); @@ -835,7 +836,7 @@ public class PutDatabaseRecordTest { public void testMultipleInsertsForTwoStatementsViaSqlStatementType(TestCase testCase) throws InitializationException, ProcessException, SQLException { setRunner(testCase); - String[] sqlStatements = new String[] { + String[] sqlStatements = new String[]{ "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)", "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);" }; @@ -1731,7 +1732,7 @@ public class PutDatabaseRecordTest { new ColumnDescription("name", 12, true, 255, true), new ColumnDescription("code", 4, true, 10, true) ), - false, + false, null, new HashSet<>(Arrays.asList("id")), "" ); @@ -2035,7 +2036,7 @@ public class PutDatabaseRecordTest { assertEquals(1, resultSet.getInt(1)); Blob blob = resultSet.getBlob(2); - assertArrayEquals(new byte[] {(byte) 171, (byte) 205, (byte) 239}, blob.getBytes(1, (int) blob.length())); + assertArrayEquals(new byte[]{(byte) 171, (byte) 205, (byte) 239}, blob.getBytes(1, (int) blob.length())); stmt.close(); conn.close(); @@ -2201,7 +2202,7 @@ public class PutDatabaseRecordTest { parser.addSchemaField("code", RecordFieldType.INT); parser.addSchemaField("content", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()).getFieldType()); - parser.addRecord(1, "rec1", 101, new Integer[] {1, 2, 3}); + parser.addRecord(1, "rec1", 101, new Integer[]{1, 2, 3}); runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); @@ -2320,8 +2321,8 @@ public class PutDatabaseRecordTest { parser.addSchemaField("id", RecordFieldType.INT); parser.addSchemaField("name", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()).getFieldType()); - byte[] longVarBinaryValue1 = new byte[] {97, 98, 99}; - byte[] longVarBinaryValue2 = new byte[] {100, 101, 102}; + byte[] longVarBinaryValue1 = new byte[]{97, 98, 99}; + byte[] longVarBinaryValue2 = new byte[]{100, 101, 102}; parser.addRecord(1, longVarBinaryValue1); parser.addRecord(2, longVarBinaryValue2); @@ -2421,7 +2422,7 @@ public class PutDatabaseRecordTest { static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord { @Override - SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException { + SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings, NameNormalizer normalizer) throws IllegalArgumentException { return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES (?,?,?,?)", Arrays.asList(0, 1, 2, 3)); } } @@ -2459,7 +2460,7 @@ public class PutDatabaseRecordTest { @Override public Connection getConnection() throws ProcessException { try { - Connection spyConnection = spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true")); + Connection spyConnection = spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true")); doThrow(SQLFeatureNotSupportedException.class).when(spyConnection).setAutoCommit(false); return spyConnection; } catch (final Exception e) { @@ -2474,6 +2475,7 @@ public class PutDatabaseRecordTest { this.rollbackOnFailure = rollbackOnFailure; this.batchSize = batchSize; } + private Boolean autoCommit = null; private Boolean rollbackOnFailure = null; private Integer batchSize = null; diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java index 88052b2673..1409f253a0 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java @@ -23,8 +23,11 @@ import java.util.Collections; import java.util.List; import org.apache.nifi.processors.standard.db.ColumnDescription; +import org.apache.nifi.processors.standard.db.NameNormalizer; +import org.apache.nifi.processors.standard.db.NameNormalizerFactory; import org.apache.nifi.processors.standard.db.DatabaseAdapter; import org.apache.nifi.processors.standard.db.TableSchema; +import org.apache.nifi.processors.standard.db.TranslationStrategy; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -144,14 +147,15 @@ public class TestOracle12DatabaseAdapter { Collection uniqueKeyColumnNames = Arrays.asList("column1", "column4"); String expected = "MERGE INTO table USING (SELECT ? column1, ? column2, ? column3, ? column_4 FROM DUAL) n" + - " ON (table.column1 = n.column1 AND table.column_4 = n.column_4) WHEN NOT MATCHED THEN" + - " INSERT (column1, column2, column3, column_4) VALUES (n.column1, n.column2, n.column3, n.column_4)" + - " WHEN MATCHED THEN UPDATE SET table.column2 = n.column2, table.column3 = n.column3"; + " ON (table.column1 = n.column1 AND table.column_4 = n.column_4) WHEN NOT MATCHED THEN" + + " INSERT (column1, column2, column3, column_4) VALUES (n.column1, n.column2, n.column3, n.column_4)" + + " WHEN MATCHED THEN UPDATE SET table.column2 = n.column2, table.column3 = n.column3"; // WHEN // THEN testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected); } + @Test public void testGetCreateTableStatement() { assertTrue(db.supportsCreateTableIfNotExists()); @@ -159,7 +163,9 @@ public class TestOracle12DatabaseAdapter { new ColumnDescription("col1", Types.INTEGER, true, 4, false), new ColumnDescription("col2", Types.VARCHAR, false, 2000, true) ); - TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", columns, true, Collections.singleton("COL1"), db.getColumnQuoteString()); + NameNormalizer normalizer = NameNormalizerFactory.getNormalizer(TranslationStrategy.REMOVE_UNDERSCORE, null); + TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", columns, + true, normalizer, Collections.singleton("COL1"), db.getColumnQuoteString()); String expectedStatement = "DECLARE\n\tsql_stmt long;\nBEGIN\n\tsql_stmt:='CREATE TABLE " // Strings are returned as VARCHAR2(2000) regardless of reported size and that VARCHAR2 is not in java.sql.Types diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java index 2e9f6b6171..99b8e3e3bd 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java @@ -17,8 +17,11 @@ package org.apache.nifi.processors.standard.db.impl; import org.apache.nifi.processors.standard.db.ColumnDescription; +import org.apache.nifi.processors.standard.db.NameNormalizer; +import org.apache.nifi.processors.standard.db.NameNormalizerFactory; import org.apache.nifi.processors.standard.db.DatabaseAdapter; import org.apache.nifi.processors.standard.db.TableSchema; +import org.apache.nifi.processors.standard.db.TranslationStrategy; import org.junit.jupiter.api.Test; import java.sql.Types; @@ -118,7 +121,9 @@ public class TestOracleDatabaseAdapter { new ColumnDescription("col1", Types.INTEGER, true, 4, false), new ColumnDescription("col2", Types.VARCHAR, false, 2000, true) ); - TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", columns, true, Collections.singleton("COL1"), db.getColumnQuoteString()); + NameNormalizer normalizer = NameNormalizerFactory.getNormalizer(TranslationStrategy.REMOVE_UNDERSCORE, null); + TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", columns, + true, normalizer, Collections.singleton("COL1"), db.getColumnQuoteString()); String expectedStatement = "DECLARE\n\tsql_stmt long;\nBEGIN\n\tsql_stmt:='CREATE TABLE " // Strings are returned as VARCHAR2(2000) regardless of reported size and that VARCHAR2 is not in java.sql.Types