NIFI-3525 Add tablename attribute to QDT flowfiles

Signed-off-by: James Wing <jvwing@gmail.com>

This closes #1537.
This commit is contained in:
ambud 2017-02-23 12:51:58 -06:00 committed by James Wing
parent bd91390105
commit 9424836412
2 changed files with 4 additions and 2 deletions

View File

@ -85,6 +85,7 @@ import java.util.stream.IntStream;
+ "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ "per the State Management documentation") + "per the State Management documentation")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "tablename", description="Name of the table being queried"),
@WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"), @WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"),
@WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@ -101,6 +102,7 @@ import java.util.stream.IntStream;
+ "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.") + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
public static final String INTIIAL_MAX_VALUE_PROP_START = "initial.maxvalue."; public static final String INTIIAL_MAX_VALUE_PROP_START = "initial.maxvalue.";
@ -277,7 +279,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final AtomicLong nrOfRows = new AtomicLong(0L); final AtomicLong nrOfRows = new AtomicLong(0L);
FlowFile fileToProcess = session.create(); FlowFile fileToProcess = session.create();
try { try {
fileToProcess = session.write(fileToProcess, out -> { fileToProcess = session.write(fileToProcess, out -> {
// Max values will be updated in the state property map by the callback // Max values will be updated in the state property map by the callback
@ -297,7 +298,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
if (nrOfRows.get() > 0) { if (nrOfRows.get() > 0) {
// set attribute how many rows were selected // set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName);
if(maxRowsPerFlowFile > 0) { if(maxRowsPerFlowFile > 0) {
fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier); fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier);
fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex)); fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex));

View File

@ -192,6 +192,7 @@ public class QueryDatabaseTableTest {
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2"); runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");