NIFI-3540: QueryDatabaseTable Failing to Track MS SQL Max Values

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1547
This commit is contained in:
patricker 2017-02-28 13:18:36 -07:00 committed by Matt Burgess
parent 0d66b6dcda
commit bf2f04fb5f
1 changed files with 5 additions and 3 deletions

View File

@ -281,7 +281,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
try { try {
fileToProcess = session.write(fileToProcess, out -> { fileToProcess = session.write(fileToProcess, out -> {
// Max values will be updated in the state property map by the callback // Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter); final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
try { try {
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile, convertNamesForAvro)); nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile, convertNamesForAvro));
} catch (SQLException | RuntimeException e) { } catch (SQLException | RuntimeException e) {
@ -419,10 +419,12 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback { protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
DatabaseAdapter dbAdapter; DatabaseAdapter dbAdapter;
Map<String, String> newColMap; Map<String, String> newColMap;
String tableName;
public MaxValueResultSetRowCollector(Map<String, String> stateMap, DatabaseAdapter dbAdapter) { public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
this.dbAdapter = dbAdapter; this.dbAdapter = dbAdapter;
newColMap = stateMap; newColMap = stateMap;
this.tableName = tableName;
} }
@Override @Override
@ -437,7 +439,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
if (nrOfColumns > 0) { if (nrOfColumns > 0) {
for (int i = 1; i <= nrOfColumns; i++) { for (int i = 1; i <= nrOfColumns; i++) {
String colName = meta.getColumnName(i).toLowerCase(); String colName = meta.getColumnName(i).toLowerCase();
String fullyQualifiedMaxValueKey = getStateKey(meta.getTableName(i), colName); String fullyQualifiedMaxValueKey = getStateKey(tableName, colName);
Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey); Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey);
// Skip any columns we're not keeping track of or whose value is null // Skip any columns we're not keeping track of or whose value is null
if (type == null || resultSet.getObject(i) == null) { if (type == null || resultSet.getObject(i) == null) {