NIFI-2749

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #997
This commit is contained in:
Peter Wicks 2016-09-08 21:37:21 -06:00 committed by Matt Burgess
parent fda15d916d
commit 938c7cccb8
2 changed files with 20 additions and 10 deletions

View File

@ -270,11 +270,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex));
}
// Add maximum values as attributes
for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
fileToProcess = session.putAttribute(fileToProcess, "maxvalue." + entry.getKey(), entry.getValue());
}
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
@ -290,13 +285,19 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
fragmentIndex++;
}
//set count on all FlowFiles
if(maxRowsPerFlowFile > 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
// Add maximum values as attributes
for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + entry.getKey(), entry.getValue()));
}
//set count on all FlowFiles
if(maxRowsPerFlowFile > 0) {
resultSetFlowFiles.set(i,
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
}
}
} catch (final SQLException e) {
throw e;
}

View File

@ -184,15 +184,21 @@ public class QueryDatabaseTableTest {
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
assertEquals(3, getNumberOfRecordsFromStream(in));
assertEquals(2, getNumberOfRecordsFromStream(in));
flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1);
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
in = new ByteArrayInputStream(flowFile.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
@ -200,6 +206,9 @@ public class QueryDatabaseTableTest {
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
//Remove Max Rows Per Flow File
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0");
// Add a new row with a higher ID and run, one flowfile with one new row should be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
runner.run();