NIFI-4393: Handle database specific identifier escape characters

QueryDatabaseTable and GenerateTableFetch processors were not able to
use max value state as expected, if max value column was wrapped with
escape characters. Due to a mis-match between computed state keys
and actual keys used in the managed state. State keys computed by
getStateKey method included escape characters while actual stored keys
did not. Resulted querying the same dataset again and again.

This commit added unwrapIdentifier method to DatabaseAdapter class to
remove database specific escape characters for identifiers such as table
and column names, so that max value state keys are populated correctly
even if identifiers are wrapped with escape characters.

This commit also added new DatabaseAdapter for MySQL, to handle MySQL
specific identifier escape with back-ticks.

This closes #2424

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Koji Kawamura 2018-01-23 15:15:36 +09:00 committed by Mike Thomsen
parent 18ad348107
commit 2007c207ab
9 changed files with 113 additions and 28 deletions

View File

@ -289,13 +289,13 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
final List<String> maxValueQualifiedColumnNameList = new ArrayList<>();
for (String maxValueColumn:maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim());
String colKey = getStateKey(tableName, maxValueColumn.trim(), dbAdapter);
maxValueQualifiedColumnNameList.add(colKey);
}
for (int i = 1; i <= numCols; i++) {
String colName = resultSetMetaData.getColumnName(i).toLowerCase();
String colKey = getStateKey(tableName, colName);
String colKey = getStateKey(tableName, colName, dbAdapter);
//only include columns that are part of the maximum value tracking column list
if (!maxValueQualifiedColumnNameList.contains(colKey)) {
@ -307,7 +307,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
}
for (String maxValueColumn:maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase());
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase(), dbAdapter);
if (!columnTypeMap.containsKey(colKey)) {
throw new ProcessException("Column not found in the table/query specified: " + maxValueColumn);
}
@ -506,14 +506,21 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
}
}
protected static String getStateKey(String prefix, String columnName) {
/**
* Construct a key string for a corresponding state value.
* @param prefix A prefix may contain database and table name, or just table name, this can be null
* @param columnName A column name
* @param adapter DatabaseAdapter is used to unwrap identifiers
* @return a state key string
*/
protected static String getStateKey(String prefix, String columnName, DatabaseAdapter adapter) {
StringBuilder sb = new StringBuilder();
if (prefix != null) {
sb.append(prefix.toLowerCase());
sb.append(adapter.unwrapIdentifier(prefix.toLowerCase()));
sb.append(NAMESPACE_DELIMITER);
}
if (columnName != null) {
sb.append(columnName.toLowerCase());
sb.append(adapter.unwrapIdentifier(columnName.toLowerCase()));
}
return sb.toString();
}

View File

@ -218,7 +218,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
// If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
String maxPropKey = maxProp.getKey().toLowerCase();
String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey);
String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey, dbAdapter);
if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
String newMaxPropValue;
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
@ -252,13 +252,13 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName, dbAdapter);
if (!StringUtils.isEmpty(maxValue)) {
if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){
if(columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null){
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
Integer type = getColumnType(tableName, colName);
Integer type = getColumnType(tableName, colName, dbAdapter);
// Add a condition for the WHERE clause
maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
@ -299,7 +299,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
// Since this column has been aliased lets check the label first,
// if there is no label we'll use the column name.
String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i)) ? rsmd.getColumnLabel(i) : rsmd.getColumnName(i)).toLowerCase();
String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName);
String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName, dbAdapter);
String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey);
if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) {
// If we can't find the value at the fully-qualified key name and the table name is static, it is possible (under a previous scheme)
@ -334,13 +334,13 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName, dbAdapter);
if (!StringUtils.isEmpty(maxValue)) {
if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){
if(columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null){
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
Integer type = getColumnType(tableName, colName);
Integer type = getColumnType(tableName, colName, dbAdapter);
// Add a condition for the WHERE clause
maxValueClauses.add(colName + " <= " + getLiteralByType(type, maxValue, dbAdapter.getName()));
@ -410,23 +410,23 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
}
private String getColumnStateMaxValue(String tableName, Map<String, String> statePropertyMap, String colName) {
final String fullyQualifiedStateKey = getStateKey(tableName, colName);
private String getColumnStateMaxValue(String tableName, Map<String, String> statePropertyMap, String colName, DatabaseAdapter adapter) {
final String fullyQualifiedStateKey = getStateKey(tableName, colName, adapter);
String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
// If the table name is static and the fully-qualified key was not found, try just the column name
maxValue = statePropertyMap.get(getStateKey(null, colName));
maxValue = statePropertyMap.get(getStateKey(null, colName, adapter));
}
return maxValue;
}
private Integer getColumnType(String tableName, String colName) {
final String fullyQualifiedStateKey = getStateKey(tableName, colName);
private Integer getColumnType(String tableName, String colName, DatabaseAdapter adapter) {
final String fullyQualifiedStateKey = getStateKey(tableName, colName, adapter);
Integer type = columnTypeMap.get(fullyQualifiedStateKey);
if (type == null && !isDynamicTableName) {
// If the table name is static and the fully-qualified key was not found, try just the column name
type = columnTypeMap.get(getStateKey(null, colName));
type = columnTypeMap.get(getStateKey(null, colName, adapter));
}
return type;

View File

@ -263,7 +263,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
//If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
String maxPropKey = maxProp.getKey().toLowerCase();
String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey);
String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey, dbAdapter);
if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
String newMaxPropValue;
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
@ -433,7 +433,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
String colName = maxValColumnNames.get(index);
String maxValueKey = getStateKey(tableName, colName);
String maxValueKey = getStateKey(tableName, colName, dbAdapter);
String maxValue = stateMap.get(maxValueKey);
if (StringUtils.isEmpty(maxValue)) {
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
@ -488,7 +488,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
if (nrOfColumns > 0) {
for (int i = 1; i <= nrOfColumns; i++) {
String colName = meta.getColumnName(i).toLowerCase();
String fullyQualifiedMaxValueKey = getStateKey(tableName, colName);
String fullyQualifiedMaxValueKey = getStateKey(tableName, colName, dbAdapter);
Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey);
// Skip any columns we're not keeping track of or whose value is null
if (type == null || resultSet.getObject(i) == null) {

View File

@ -37,4 +37,17 @@ public interface DatabaseAdapter {
* @return A String containing a SQL SELECT statement with the given clauses applied
*/
String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset);
/**
* <p>Returns a bare identifier string by removing wrapping escape characters
* from identifier strings such as table and column names.</p>
* <p>The default implementation of this method removes double quotes.
* If the target database engine supports different escape characters, then its DatabaseAdapter implementation should override
* this method so that such escape characters can be removed properly.</p>
* @param identifier An identifier which may be wrapped with escape characters
* @return An unwrapped identifier string, or null if the input identifier is null
*/
default String unwrapIdentifier(String identifier) {
return identifier == null ? null : identifier.replaceAll("\"", "");
}
}

View File

@ -17,12 +17,11 @@
package org.apache.nifi.processors.standard.db.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
/**
* A database adapter that generates MS SQL Compatible SQL for version 2008.
*/
public class MSSQL2008DatabaseAdapter implements DatabaseAdapter {
public class MSSQL2008DatabaseAdapter extends MSSQLDatabaseAdapter {
@Override
public String getName() {
return "MS SQL 2008";

View File

@ -79,4 +79,10 @@ public class MSSQLDatabaseAdapter implements DatabaseAdapter {
return query.toString();
}
@Override
public String unwrapIdentifier(String identifier) {
// Remove double quotes and square brackets.
return identifier == null ? null : identifier.replaceAll("[\"\\[\\]]", "");
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db.impl;
/**
* A generic database adapter that generates MySQL compatible SQL.
*/
public class MySQLDatabaseAdapter extends GenericDatabaseAdapter {
@Override
public String getName() {
return "MySQL";
}
@Override
public String getDescription() {
return "Generates MySQL compatible SQL";
}
@Override
public String unwrapIdentifier(String identifier) {
// Removes double quotes and back-ticks.
return identifier == null ? null : identifier.replaceAll("[\"`]", "");
}
}

View File

@ -17,3 +17,4 @@ org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter
org.apache.nifi.processors.standard.db.impl.Oracle12DatabaseAdapter
org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter
org.apache.nifi.processors.standard.db.impl.MSSQL2008DatabaseAdapter
org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter

View File

@ -29,6 +29,8 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@ -145,16 +147,35 @@ public class QueryDatabaseTableTest {
maxValues.put("id", "509");
StateManager stateManager = runner.getStateManager();
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
processor.putColumnType(processor.getStateKey("mytable", "id", dbAdapter), Types.INTEGER);
query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509", query);
maxValues.put("date_created", "2016-03-07 12:34:56");
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP);
processor.putColumnType(processor.getStateKey("mytable", "date_created", dbAdapter), Types.TIMESTAMP);
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), null, stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query);
// Double quotes can be used to escape column and table names with most ANSI compatible database engines.
maxValues.put("mytable@!@date-created", "2016-03-07 12:34:56");
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType(processor.getStateKey("\"myTable\"", "\"DATE-CREATED\"", dbAdapter), Types.TIMESTAMP);
query = processor.getQuery(dbAdapter, "\"myTable\"", null, Arrays.asList("id", "\"DATE-CREATED\""), null, stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM \"myTable\" WHERE id > 509 AND \"DATE-CREATED\" >= '2016-03-07 12:34:56'", query);
// Back-ticks can be used to escape MySQL column and table names.
dbAdapter = new MySQLDatabaseAdapter();
processor.putColumnType(processor.getStateKey("`myTable`", "`DATE-CREATED`", dbAdapter), Types.TIMESTAMP);
query = processor.getQuery(dbAdapter, "`myTable`", null, Arrays.asList("id", "`DATE-CREATED`"), null, stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM `myTable` WHERE id > 509 AND `DATE-CREATED` >= '2016-03-07 12:34:56'", query);
// Square brackets can be used to escape Microsoft SQL Server column and table names.
dbAdapter = new MSSQLDatabaseAdapter();
processor.putColumnType(processor.getStateKey("[myTable]", "[DATE-CREATED]", dbAdapter), Types.TIMESTAMP);
query = processor.getQuery(dbAdapter, "[myTable]", null, Arrays.asList("id", "[DATE-CREATED]"), null, stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM [myTable] WHERE id > 509 AND [DATE-CREATED] >= '2016-03-07 12:34:56'", query);
// Test Oracle strategy
dbAdapter = new OracleDatabaseAdapter();
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());