diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 31bec276f0..278cc30f25 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -270,11 +270,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex)); } - // Add maximum values as attributes - for (Map.Entry entry : statePropertyMap.entrySet()) { - fileToProcess = session.putAttribute(fileToProcess, "maxvalue." + entry.getKey(), entry.getValue()); - } - logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{fileToProcess, nrOfRows.get()}); @@ -290,13 +285,19 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { fragmentIndex++; } - //set count on all FlowFiles - if(maxRowsPerFlowFile > 0) { - for (int i = 0; i < resultSetFlowFiles.size(); i++) { + for (int i = 0; i < resultSetFlowFiles.size(); i++) { + // Add maximum values as attributes + for (Map.Entry entry : statePropertyMap.entrySet()) { + resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + entry.getKey(), entry.getValue())); + } + + //set count on all FlowFiles + if(maxRowsPerFlowFile > 0) { resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex))); } } + } catch (final SQLException e) { throw e; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 974a835808..f3904ef358 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -184,15 +184,21 @@ public class QueryDatabaseTableTest { runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); runner.setIncomingConnection(false); runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2"); runner.run(); - runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2); MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2"); - assertEquals(3, getNumberOfRecordsFromStream(in)); + assertEquals(2, getNumberOfRecordsFromStream(in)); + + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + in = new ByteArrayInputStream(flowFile.toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); runner.clearTransferState(); // Run again, this time no flowfiles/rows should be transferred @@ -200,6 +206,9 @@ public class QueryDatabaseTableTest { runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); runner.clearTransferState(); + //Remove Max Rows Per Flow File + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0"); + // Add a new row with a higher ID and run, one flowfile with one new row should be transferred stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')"); runner.run();