mirror of https://github.com/apache/nifi.git
NIFI-6934 In PutDatabaseRecord added DatabaseAdapter-based UPSERT support for Postgres (9.5+)
NIFI-6934 Added more documentation and unit tests. NIFI-6934 Added missing license for new test class. Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #4350
This commit is contained in:
@ -29,6 +29,8 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -47,6 +49,7 @@ import org.apache.nifi.processor.util.pattern.PartialFunctions;
import org.apache.nifi.processor.util.pattern.Put;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@ -70,11 +73,13 @@ import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLNonTransientException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -101,6 +106,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
static final String UPDATE_TYPE = "UPDATE";
static final String INSERT_TYPE = "INSERT";
static final String DELETE_TYPE = "DELETE";
static final String UPSERT_TYPE = "UPSERT";
static final String SQL_TYPE = "SQL"; // Not an allowable value in the Statement Type property, must be set by attribute
static final String USE_ATTR_TYPE = "Use statement.type Attribute";
@ -152,11 +158,14 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
.displayName("Statement Type")
.description("Specifies the type of SQL Statement to generate. If 'Use statement.type Attribute' is chosen, then the value is taken from the statement.type attribute in the "
.description("Specifies the type of SQL Statement to generate. "
+ "Please refer to the database documentation for a description of the behavior of each operation. "
+ "Please note that some Database Types may not support certain Statement Types. "
+ "If 'Use statement.type Attribute' is chosen, then the value is taken from the statement.type attribute in the "
+ "FlowFile. The 'Use statement.type Attribute' option is the only one that allows the 'SQL' statement type. If 'SQL' is specified, the value of the field specified by the "
+ "'Field Containing SQL' property is expected to be a valid SQL statement on the target database, and will be executed as-is.")
static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
@ -299,11 +308,34 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor DB_TYPE;
protected static final Map<String, DatabaseAdapter> dbAdapters;
protected static List<PropertyDescriptor> propDescriptors;
private Cache<SchemaKey, TableSchema> schemaCache;
static {
dbAdapters = new HashMap<>();
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
dbAdapterLoader.forEach(databaseAdapter -> {
dbAdapters.put(databaseAdapter.getName(), databaseAdapter);
dbAdapterValues.add(new AllowableValue(databaseAdapter.getName(), databaseAdapter.getName(), databaseAdapter.getDescription()));
DB_TYPE = new PropertyDescriptor.Builder()
.displayName("Database Type")
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
.allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()]))
final Set<Relationship> r = new HashSet<>();
@ -312,6 +344,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
final List<PropertyDescriptor> pds = new ArrayList<>();
@ -335,6 +368,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
private Put<FunctionContext, Connection> process;
private ExceptionHandler<FunctionContext> exceptionHandler;
private DatabaseAdapter databaseAdapter;
public Set<Relationship> getRelationships() {
@ -444,9 +478,29 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
Collection<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
DatabaseAdapter databaseAdapter = dbAdapters.get(validationContext.getProperty(DB_TYPE).getValue());
String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue();
if (UPSERT_TYPE.equals(statementType) && !databaseAdapter.supportsUpsert()) {
validationResults.add(new ValidationResult.Builder()
.explanation(databaseAdapter.getName() + " does not support " + statementType)
return validationResults;
public void onScheduled(final ProcessContext context) {
databaseAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
final int tableSchemaCacheSize = context.getProperty(TABLE_SCHEMA_CACHE_SIZE).asInteger();
schemaCache = Caffeine.newBuilder()
@ -657,6 +711,9 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
} else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateDelete(recordSchema, fqTableName, tableSchema, settings);
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings);
} else {
throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile));
@ -790,20 +847,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings)
throws IllegalArgumentException, SQLException {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
for (final String requiredColName : tableSchema.getRequiredColumnNames()) {
final String normalizedColName = normalizeColumnName(requiredColName, settings.translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
if (settings.failUnmappedColumns) {
throw new IllegalArgumentException(missingColMessage);
} else if (settings.warningUnmappedColumns) {
checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("INSERT INTO ");
@ -854,47 +898,59 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
SqlAndIncludedColumns generateUpsert(final RecordSchema recordSchema, final String tableName, final String updateKeys,
final TableSchema tableSchema, final DMLSettings settings)
throws IllegalArgumentException, SQLException, MalformedRecordException {
checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames);
List<String> usedColumnNames = new ArrayList<>();
List<Integer> usedColumnIndices = new ArrayList<>();
List<String> fieldNames = recordSchema.getFieldNames();
if (fieldNames != null) {
int fieldCount = fieldNames.size();
for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
if (desc != null) {
if (settings.escapeColumnNames) {
usedColumnNames.add(tableSchema.getQuotedIdentifierString() + desc.getColumnName() + tableSchema.getQuotedIdentifierString());
} else {
String sql = databaseAdapter.getUpsertStatement(tableName, usedColumnNames, normalizedKeyColumnNames);
return new SqlAndIncludedColumns(sql, usedColumnIndices);
SqlAndIncludedColumns generateUpdate(final RecordSchema recordSchema, final String tableName, final String updateKeys,
final TableSchema tableSchema, final DMLSettings settings)
throws IllegalArgumentException, MalformedRecordException, SQLException {
final Set<String> updateKeyNames;
if (updateKeys == null) {
updateKeyNames = tableSchema.getPrimaryKeyColumnNames();
} else {
updateKeyNames = new HashSet<>();
for (final String updateKey : updateKeys.split(",")) {
if (updateKeyNames.isEmpty()) {
throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
final Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
final Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames);
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("UPDATE ");
// Create a Set of all normalized Update Key names, and ensure that there is a field in the record
// for each of the Update Key fields.
final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
final Set<String> normalizedUpdateNames = new HashSet<>();
for (final String uk : updateKeyNames) {
final String normalizedUK = normalizeColumnName(uk, settings.translateFieldNames);
if (!normalizedFieldNames.contains(normalizedUK)) {
String missingColMessage = "Record does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
if (settings.failUnmappedColumns) {
throw new MalformedRecordException(missingColMessage);
} else if (settings.warningUnmappedColumns) {
// iterate over all of the fields in the record, building the SQL statement by adding the column names
List<String> fieldNames = recordSchema.getFieldNames();
final List<Integer> includedColumns = new ArrayList<>();
@ -920,7 +976,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
// Check if this column is an Update Key. If so, skip it for now. We will come
// back to it after we finish the SET clause
if (!normalizedUpdateNames.contains(normalizedColName)) {
if (!normalizedKeyColumnNames.contains(normalizedColName)) {
if (fieldsFound.getAndIncrement() > 0) {
sqlBuilder.append(", ");
@ -952,7 +1008,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
if (desc != null) {
// Check if this column is a Update Key. If so, add it to the WHERE clause
if (normalizedUpdateNames.contains(normalizedColName)) {
if (normalizedKeyColumnNames.contains(normalizedColName)) {
if (whereFieldCount.getAndIncrement() > 0) {
sqlBuilder.append(" AND ");
@ -1045,6 +1101,66 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
private void checkValuesForRequiredColumns(RecordSchema recordSchema, TableSchema tableSchema, DMLSettings settings) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
for (final String requiredColName : tableSchema.getRequiredColumnNames()) {
final String normalizedColName = normalizeColumnName(requiredColName, settings.translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
if (settings.failUnmappedColumns) {
throw new IllegalArgumentException(missingColMessage);
} else if (settings.warningUnmappedColumns) {
private Set<String> getUpdateKeyColumnNames(String tableName, String updateKeys, TableSchema tableSchema) throws SQLIntegrityConstraintViolationException {
final Set<String> updateKeyColumnNames;
if (updateKeys == null) {
updateKeyColumnNames = tableSchema.getPrimaryKeyColumnNames();
} else {
updateKeyColumnNames = new HashSet<>();
for (final String updateKey : updateKeys.split(",")) {
if (updateKeyColumnNames.isEmpty()) {
throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
return updateKeyColumnNames;
private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema recordSchema, String updateKeys, DMLSettings settings, Set<String> updateKeyColumnNames) throws MalformedRecordException {
// Create a Set of all normalized Update Key names, and ensure that there is a field in the record
// for each of the Update Key fields.
final Set<String> normalizedRecordFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
final Set<String> normalizedKeyColumnNames = new HashSet<>();
for (final String updateKeyColumnName : updateKeyColumnNames) {
final String normalizedKeyColumnName = normalizeColumnName(updateKeyColumnName, settings.translateFieldNames);
if (!normalizedRecordFieldNames.contains(normalizedKeyColumnName)) {
String missingColMessage = "Record does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + updateKeyColumnName + "'";
if (settings.failUnmappedColumns) {
throw new MalformedRecordException(missingColMessage);
} else if (settings.warningUnmappedColumns) {
return normalizedKeyColumnNames;
private static String normalizeColumnName(final String colName, final boolean translateColumnNames) {
return colName == null ? null : (translateColumnNames ? colName.toUpperCase().replace("_", "") : colName);
@ -16,6 +16,9 @@
package org.apache.nifi.processors.standard.db;
import java.util.Collection;
import java.util.List;
* Interface for RDBMS/JDBC-specific code.
@ -55,6 +58,30 @@ public interface DatabaseAdapter {
return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset);
* Tells whether this adapter supports UPSERT.
* @return true if UPSERT is supported, false otherwise
default boolean supportsUpsert() {
return false;
* Returns an SQL UPSERT statement - i.e. UPDATE record or INSERT if id doesn't exist.
* <br /><br />
* There is no standard way of doing this so not all adapters support it - use together with {@link #supportsUpsert()}!
* @param table The name of the table in which to update/insert a record into.
* @param columnNames The name of the columns in the table to add values to.
* @param uniqueKeyColumnNames The name of the columns that form a unique key.
* @return A String containing the parameterized jdbc SQL statement.
* The order and number of parameters are the same as that of the provided column list.
default String getUpsertStatement(String table, List<String> columnNames, Collection<String> uniqueKeyColumnNames) {
throw new UnsupportedOperationException("UPSERT is not supported for " + getName());
* <p>Returns a bare identifier string by removing wrapping escape characters
* from identifier strings such as table and column names.</p>
@ -0,0 +1,75 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard.db.impl;
import com.google.common.base.Preconditions;
import org.apache.nifi.util.StringUtils;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
public class PostgreSQLDatabaseAdapter extends GenericDatabaseAdapter {
public String getName() {
return "PostgreSQL";
public String getDescription() {
return "Generates PostgreSQL compatible SQL";
public boolean supportsUpsert() {
return true;
public String getUpsertStatement(String table, List<String> columnNames, Collection<String> uniqueKeyColumnNames) {
Preconditions.checkArgument(!StringUtils.isEmpty(table), "Table name cannot be null or blank");
Preconditions.checkArgument(columnNames != null && !columnNames.isEmpty(), "Column names cannot be null or empty");
Preconditions.checkArgument(uniqueKeyColumnNames != null && !uniqueKeyColumnNames.isEmpty(), "Key column names cannot be null or empty");
String columns = columnNames.stream()
.collect(Collectors.joining(", "));
String parameterizedInsertValues = columnNames.stream()
.map(__ -> "?")
.collect(Collectors.joining(", "));
String updateValues = columnNames.stream()
.map(columnName -> "EXCLUDED." + columnName)
.collect(Collectors.joining(", "));
String conflictClause = "(" + uniqueKeyColumnNames.stream().collect(Collectors.joining(", ")) + ")";
StringBuilder statementStringBuilder = new StringBuilder("INSERT INTO ")
.append(" VALUES ")
.append(" ON CONFLICT ")
.append(" DO UPDATE SET ")
.append(" = ")
return statementStringBuilder.toString();
@ -17,4 +17,5 @@ org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter
@ -0,0 +1,108 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard.db.impl;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestPostgreSQLDatabaseAdapter {
private PostgreSQLDatabaseAdapter testSubject;
public void setUp() throws Exception {
testSubject = new PostgreSQLDatabaseAdapter();
public void testSupportsUpsert() throws Exception {
assertTrue(testSubject.getClass().getSimpleName() + " should support upsert", testSubject.supportsUpsert());
public void testGetUpsertStatementWithNullTableName() throws Exception {
testGetUpsertStatement(null, Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank"));
public void testGetUpsertStatementWithBlankTableName() throws Exception {
testGetUpsertStatement("", Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank"));
public void testGetUpsertStatementWithNullColumnNames() throws Exception {
testGetUpsertStatement("notEmpty", null, Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty"));
public void testGetUpsertStatementWithEmptyColumnNames() throws Exception {
testGetUpsertStatement("notEmpty", Collections.emptyList(), Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty"));
public void testGetUpsertStatementWithNullKeyColumnNames() throws Exception {
testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), null, new IllegalArgumentException("Key column names cannot be null or empty"));
public void testGetUpsertStatementWithEmptyKeyColumnNames() throws Exception {
testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), Collections.emptyList(), new IllegalArgumentException("Key column names cannot be null or empty"));
public void testGetUpsertStatement() throws Exception {
String tableName = "table";
List<String> columnNames = Arrays.asList("column1","column2", "column3", "column4");
Collection<String> uniqueKeyColumnNames = Arrays.asList("column2","column4");
String expected = "INSERT INTO" +
" table(column1, column2, column3, column4) VALUES (?, ?, ?, ?)" +
" ON CONFLICT (column2, column4)" +
" (column1, column2, column3, column4) = (EXCLUDED.column1, EXCLUDED.column2, EXCLUDED.column3, EXCLUDED.column4)";
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected);
private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, IllegalArgumentException expected) {
try {
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, (String)null);
} catch (IllegalArgumentException e) {
assertEquals(expected.getMessage(), e.getMessage());
private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, String expected) {
String actual = testSubject.getUpsertStatement(tableName, columnNames, uniqueKeyColumnNames);
assertEquals(expected, actual);
Reference in New Issue