NIFI-13351 Improved QueryDatabaseTable Processors to call session.putAttributes()

This closes #8919

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Jim Steinebrey 2024-06-03 19:04:17 -04:00 committed by exceptionfactory
parent 6ca03eae55
commit 85cf0e995e
No known key found for this signature in database

View File

@ -478,19 +478,22 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
// Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
if (outputBatchSize == 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
final Map<String, String> newAttributesMap = new HashMap<>();
// Add maximum values as attributes
for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
// Get just the column name from the key
String key = entry.getKey();
String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
newAttributesMap.put("maxvalue." + colName, entry.getValue());
}
//set count on all FlowFiles
// Set count for all FlowFiles
if (maxRowsPerFlowFile > 0) {
resultSetFlowFiles.set(i,
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
newAttributesMap.put(FRAGMENT_COUNT, Integer.toString(fragmentIndex));
}
resultSetFlowFiles.set(i, session.putAllAttributes(resultSetFlowFiles.get(i), newAttributesMap));
}
}
} catch (final SQLException e) {