NIFI-8142: Add "Insert Ignore" option to PutDatabaseRecord

NIFI-8142: Fix Checkstyle error

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4778
This commit is contained in:
snorlaxa 2021-01-27 17:21:36 +08:00 committed by Matthew Burgess
parent d08a0babb7
commit 04fb1aca47
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
6 changed files with 179 additions and 4 deletions

View File

@ -113,6 +113,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
public static final String INSERT_TYPE = "INSERT";
public static final String DELETE_TYPE = "DELETE";
public static final String UPSERT_TYPE = "UPSERT";
public static final String INSERT_IGNORE_TYPE = "INSERT_IGNORE";
public static final String SQL_TYPE = "SQL"; // Not an allowable value in the Statement Type property, must be set by attribute
public static final String USE_ATTR_TYPE = "Use statement.type Attribute";
public static final String USE_RECORD_PATH = "Use Record Path";
@ -172,7 +173,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
+ "FlowFile. The 'Use statement.type Attribute' option is the only one that allows the 'SQL' statement type. If 'SQL' is specified, the value of the field specified by the "
+ "'Field Containing SQL' property is expected to be a valid SQL statement on the target database, and will be executed as-is.")
.required(true)
.allowableValues(UPDATE_TYPE, INSERT_TYPE, UPSERT_TYPE, DELETE_TYPE, USE_ATTR_TYPE, USE_RECORD_PATH)
.allowableValues(UPDATE_TYPE, INSERT_TYPE, UPSERT_TYPE, INSERT_IGNORE_TYPE, DELETE_TYPE, USE_ATTR_TYPE, USE_RECORD_PATH)
.build();
static final PropertyDescriptor STATEMENT_TYPE_RECORD_PATH = new Builder()
@ -430,8 +431,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
DatabaseAdapter databaseAdapter = dbAdapters.get(validationContext.getProperty(DB_TYPE).getValue());
String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue();
if (UPSERT_TYPE.equals(statementType) && !databaseAdapter.supportsUpsert()) {
if ((UPSERT_TYPE.equals(statementType) && !databaseAdapter.supportsUpsert())
|| (INSERT_IGNORE_TYPE.equals(statementType) && !databaseAdapter.supportsInsertIgnore())) {
validationResults.add(new ValidationResult.Builder()
.subject(STATEMENT_TYPE.getDisplayName())
.valid(false)
@ -647,6 +648,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlHolder = generateDelete(recordSchema, fqTableName, tableSchema, settings);
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings);
} else if (INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateInsertIgnore(recordSchema, fqTableName, updateKeys, tableSchema, settings);
} else {
throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile));
}
@ -791,7 +794,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
if (INSERT_TYPE.equalsIgnoreCase(statementType) || UPDATE_TYPE.equalsIgnoreCase(statementType) || DELETE_TYPE.equalsIgnoreCase(statementType)
|| UPSERT_TYPE.equalsIgnoreCase(statementType) || SQL_TYPE.equalsIgnoreCase(statementType) || USE_RECORD_PATH.equalsIgnoreCase(statementType) ) {
|| UPSERT_TYPE.equalsIgnoreCase(statementType) || SQL_TYPE.equalsIgnoreCase(statementType) || USE_RECORD_PATH.equalsIgnoreCase(statementType)
|| INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) {
return statementType;
}
@ -955,6 +959,47 @@ public class PutDatabaseRecord extends AbstractProcessor {
return new SqlAndIncludedColumns(sql, usedColumnIndices);
}
SqlAndIncludedColumns generateInsertIgnore(final RecordSchema recordSchema, final String tableName, final String updateKeys,
final TableSchema tableSchema, final DMLSettings settings)
throws IllegalArgumentException, SQLException, MalformedRecordException {
checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, tableSchema.getQuotedIdentifierString());
List<String> usedColumnNames = new ArrayList<>();
List<Integer> usedColumnIndices = new ArrayList<>();
List<String> fieldNames = recordSchema.getFieldNames();
if (fieldNames != null) {
int fieldCount = fieldNames.size();
for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
}
if (desc != null) {
if (settings.escapeColumnNames) {
usedColumnNames.add(tableSchema.getQuotedIdentifierString() + desc.getColumnName() + tableSchema.getQuotedIdentifierString());
} else {
usedColumnNames.add(desc.getColumnName());
}
usedColumnIndices.add(i);
}
}
}
String sql = databaseAdapter.getInsertIgnoreStatement(tableName, usedColumnNames, normalizedKeyColumnNames);
return new SqlAndIncludedColumns(sql, usedColumnIndices);
}
SqlAndIncludedColumns generateUpdate(final RecordSchema recordSchema, final String tableName, final String updateKeys,
final TableSchema tableSchema, final DMLSettings settings)
throws IllegalArgumentException, MalformedRecordException, SQLException {

View File

@ -67,6 +67,15 @@ public interface DatabaseAdapter {
return false;
}
/**
* Tells whether this adapter supports INSERT_IGNORE.
*
* @return true if INSERT_IGNORE is supported, false otherwise
*/
default boolean supportsInsertIgnore() {
return false;
}
/**
* Tells How many times the column values need to be inserted into the prepared statement. Some DBs (such as MySQL) need the values specified twice in the statement,
* some need only to specify them once.
@ -92,6 +101,21 @@ public interface DatabaseAdapter {
throw new UnsupportedOperationException("UPSERT is not supported for " + getName());
}
/**
* Returns an SQL INSERT_IGNORE statement - i.e. Ignore record or INSERT if id doesn't exist.
* <br /><br />
* There is no standard way of doing this so not all adapters support it - use together with {@link #supportsInsertIgnore()}!
*
* @param table The name of the table in which to ignore/insert a record into.
* @param columnNames The name of the columns in the table to add values to.
* @param uniqueKeyColumnNames The name of the columns that form a unique key.
* @return A String containing the parameterized jdbc SQL statement.
* The order and number of parameters are the same as that of the provided column list.
*/
default String getInsertIgnoreStatement(String table, List<String> columnNames, Collection<String> uniqueKeyColumnNames) {
throw new UnsupportedOperationException("UPSERT is not supported for " + getName());
}
/**
* <p>Returns a bare identifier string by removing wrapping escape characters
* from identifier strings such as table and column names.</p>

View File

@ -49,6 +49,11 @@ public class MySQLDatabaseAdapter extends GenericDatabaseAdapter {
return true;
}
@Override
public boolean supportsInsertIgnore() {
return true;
}
/**
* Tells How many times the column values need to be inserted into the prepared statement. Some DBs (such as MySQL) need the values specified twice in the statement,
* some need only to specify them once.
@ -86,7 +91,27 @@ public class MySQLDatabaseAdapter extends GenericDatabaseAdapter {
.append("(").append(parameterizedInsertValues).append(")")
.append(" ON DUPLICATE KEY UPDATE ")
.append(parameterizedUpdateValues);
return statementStringBuilder.toString();
}
@Override
public String getInsertIgnoreStatement(String table, List<String> columnNames, Collection<String> uniqueKeyColumnNames) {
Preconditions.checkArgument(!StringUtils.isEmpty(table), "Table name cannot be null or blank");
Preconditions.checkArgument(columnNames != null && !columnNames.isEmpty(), "Column names cannot be null or empty");
Preconditions.checkArgument(uniqueKeyColumnNames != null && !uniqueKeyColumnNames.isEmpty(), "Key column names cannot be null or empty");
String columns = columnNames.stream()
.collect(Collectors.joining(", "));
String parameterizedInsertValues = columnNames.stream()
.map(__ -> "?")
.collect(Collectors.joining(", "));
StringBuilder statementStringBuilder = new StringBuilder("INSERT IGNORE INTO ")
.append(table)
.append("(").append(columns).append(")")
.append(" VALUES ")
.append("(").append(parameterizedInsertValues).append(")");
return statementStringBuilder.toString();
}
}

View File

@ -39,6 +39,11 @@ public class PostgreSQLDatabaseAdapter extends GenericDatabaseAdapter {
return true;
}
@Override
public boolean supportsInsertIgnore() {
return true;
}
@Override
public String getUpsertStatement(String table, List<String> columnNames, Collection<String> uniqueKeyColumnNames) {
Preconditions.checkArgument(!StringUtils.isEmpty(table), "Table name cannot be null or blank");
@ -72,4 +77,32 @@ public class PostgreSQLDatabaseAdapter extends GenericDatabaseAdapter {
return statementStringBuilder.toString();
}
@Override
public String getInsertIgnoreStatement(String table, List<String> columnNames, Collection<String> uniqueKeyColumnNames) {
Preconditions.checkArgument(!StringUtils.isEmpty(table), "Table name cannot be null or blank");
Preconditions.checkArgument(columnNames != null && !columnNames.isEmpty(), "Column names cannot be null or empty");
Preconditions.checkArgument(uniqueKeyColumnNames != null && !uniqueKeyColumnNames.isEmpty(), "Key column names cannot be null or empty");
String columns = columnNames.stream()
.collect(Collectors.joining(", "));
String parameterizedInsertValues = columnNames.stream()
.map(__ -> "?")
.collect(Collectors.joining(", "));
String conflictClause = "(" + uniqueKeyColumnNames.stream().collect(Collectors.joining(", ")) + ")";
StringBuilder statementStringBuilder = new StringBuilder("INSERT INTO ")
.append(table)
.append("(").append(columns).append(")")
.append(" VALUES ")
.append("(").append(parameterizedInsertValues).append(")")
.append(" ON CONFLICT ")
.append(conflictClause)
.append(" DO NOTHING");
return statementStringBuilder.toString();
}
}

View File

@ -89,6 +89,21 @@ public class TestMySQLDatabaseAdapter {
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected);
}
@Test
public void testGetInsertIgnoreStatement() throws Exception {
// GIVEN
String tableName = "table";
List<String> columnNames = Arrays.asList("column1", "column2", "column3", "column4");
Collection<String> uniqueKeyColumnNames = Arrays.asList("column2", "column4");
String ignoreExpected = "INSERT IGNORE INTO" +
" table(column1, column2, column3, column4) VALUES (?, ?, ?, ?)";
// WHEN
// THEN
testGetInsertIgnoreStatement(tableName, columnNames, uniqueKeyColumnNames, ignoreExpected);
}
private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, IllegalArgumentException expected) {
try {
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, (String) null);
@ -106,4 +121,12 @@ public class TestMySQLDatabaseAdapter {
assertEquals(expected, actual);
}
private void testGetInsertIgnoreStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, String expected) {
// WHEN
String actual = testSubject.getInsertIgnoreStatement(tableName, columnNames, uniqueKeyColumnNames);
// THEN
assertEquals(expected, actual);
}
}

View File

@ -89,6 +89,23 @@ public class TestPostgreSQLDatabaseAdapter {
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected);
}
@Test
public void testGetInsertIgnoreStatement() throws Exception {
// GIVEN
String tableName = "table";
List<String> columnNames = Arrays.asList("column1","column2", "column3", "column4");
Collection<String> uniqueKeyColumnNames = Arrays.asList("column2","column4");
String expected = "INSERT INTO" +
" table(column1, column2, column3, column4) VALUES (?, ?, ?, ?)" +
" ON CONFLICT (column2, column4)" +
" DO NOTHING";
// WHEN
// THEN
testGetInsertIgnoreStatement(tableName, columnNames, uniqueKeyColumnNames, expected);
}
private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, IllegalArgumentException expected) {
try {
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, (String)null);
@ -106,6 +123,14 @@ public class TestPostgreSQLDatabaseAdapter {
assertEquals(expected, actual);
}
private void testGetInsertIgnoreStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, String expected) {
// WHEN
String actual = testSubject.getInsertIgnoreStatement(tableName, columnNames, uniqueKeyColumnNames);
// THEN
assertEquals(expected, actual);
}
@Test
public void testGetUpsertStatementQuoted() {
// GIVEN