diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 4f852829fd..3b68e292c8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -53,7 +53,15 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.text.ParseException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -75,7 +83,11 @@ import java.util.concurrent.atomic.AtomicLong; @WritesAttribute(attribute = "querydbtable.row.count"), @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), - @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + @WritesAttribute(attribute="fragment.count", description="If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."), + @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + "FlowFiles were produced")}) @DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " @@ -152,7 +164,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { ProcessSession session = sessionFactory.createSession(); - final Set resultSetFlowFiles = new HashSet<>(); + final List resultSetFlowFiles = new ArrayList<>(); final ComponentLog logger = getLogger(); @@ -261,6 +273,14 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { fragmentIndex++; } + + //set count on all FlowFiles + if(maxRowsPerFlowFile > 0) { + for (int i = 0; i < resultSetFlowFiles.size(); i++) { + resultSetFlowFiles.set(i, + session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex))); + } + } } catch (final SQLException e) { throw e; } @@ -322,7 +342,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { for (final Map.Entry entry : properties.entrySet()) { final String key = entry.getKey().getName(); - if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) { continue; } + if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) { + continue; + } defaultMaxValues.put(key.substring(INTIIAL_MAX_VALUE_PROP_START.length()), entry.getValue()); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 4279ca34de..40fba54f45 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -30,6 +30,7 @@ import org.apache.nifi.processors.standard.db.DatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.file.FileUtils; @@ -387,6 +388,7 @@ public class QueryDatabaseTableTest { final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); Statement stmt = con.createStatement(); InputStream in; + MockFlowFile mff; try { stmt.execute("drop table TEST_QUERY_DB_TABLE"); @@ -412,13 +414,22 @@ public class QueryDatabaseTableTest { //ensure all but the last file have 9 records each for(int ff=0;ff<11;ff++) { - in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff).toByteArray()); + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff); + in = new ByteArrayInputStream(mff.toByteArray()); assertEquals(9, getNumberOfRecordsFromStream(in)); + + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); } //last file should have 1 record - in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11).toByteArray()); + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11); + in = new ByteArrayInputStream(mff.toByteArray()); assertEquals(1, getNumberOfRecordsFromStream(in)); + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(11), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); runner.clearTransferState(); // Run again, this time no flowfiles/rows should be transferred @@ -434,7 +445,11 @@ public class QueryDatabaseTableTest { runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); - in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + in = new ByteArrayInputStream(mff.toByteArray()); + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(0), mff.getAttribute("fragment.index")); + assertEquals("1", mff.getAttribute("fragment.count")); assertEquals(5, getNumberOfRecordsFromStream(in)); runner.clearTransferState();