NIFI-2641: This closes #928. Add max values as attributes to QueryDatabaseTable

This commit is contained in:
Matt Burgess 2016-08-24 09:38:38 -04:00 committed by joewitt
parent 04db806ace
commit ad7808f63d
2 changed files with 18 additions and 4 deletions

View File

@ -89,7 +89,9 @@ import java.util.concurrent.atomic.AtomicLong;
@WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced")})
+ "FlowFiles were produced"),
@WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The "
+ "suffix of the attribute is the name of the column")})
@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
+ "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
@ -258,6 +260,11 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex));
}
// Add maximum values as attributes
for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
fileToProcess = session.putAttribute(fileToProcess, "maxvalue." + entry.getKey(), entry.getValue());
}
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});

View File

@ -182,7 +182,9 @@ public class QueryDatabaseTableTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
assertEquals(3, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
@ -196,7 +198,9 @@ public class QueryDatabaseTableTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals(flowFile.getAttribute("maxvalue.id"), "3");
in = new ByteArrayInputStream(flowFile.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
// Sanity check - run again, this time no flowfiles/rows should be transferred
@ -212,7 +216,10 @@ public class QueryDatabaseTableTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234");
in = new ByteArrayInputStream(flowFile.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();