diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 3b68e292c8..d7f4b24af3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -89,7 +89,9 @@ import java.util.concurrent.atomic.AtomicLong; @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " - + "FlowFiles were produced")}) + + "FlowFiles were produced"), + @WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The " + + "suffix of the attribute is the name of the column")}) @DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.") public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { @@ -258,6 +260,11 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex)); } + // Add maximum values as attributes + for (Map.Entry entry : statePropertyMap.entrySet()) { + fileToProcess = session.putAttribute(fileToProcess, "maxvalue." + entry.getKey(), entry.getValue()); + } + logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{fileToProcess, nrOfRows.get()}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 40fba54f45..e97e52926e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -182,7 +182,9 @@ public class QueryDatabaseTableTest { runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); - InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2"); assertEquals(3, getNumberOfRecordsFromStream(in)); runner.clearTransferState(); @@ -196,7 +198,9 @@ public class QueryDatabaseTableTest { stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')"); runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); - in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals(flowFile.getAttribute("maxvalue.id"), "3"); + in = new ByteArrayInputStream(flowFile.toByteArray()); assertEquals(1, getNumberOfRecordsFromStream(in)); // Sanity check - run again, this time no flowfiles/rows should be transferred @@ -212,7 +216,10 @@ public class QueryDatabaseTableTest { stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')"); runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); - in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals(flowFile.getAttribute("maxvalue.id"), "4"); + assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234"); + in = new ByteArrayInputStream(flowFile.toByteArray()); assertEquals(1, getNumberOfRecordsFromStream(in)); runner.clearTransferState();