From 2a5e21c11b4c801c6b14e978f80f79cf807483a0 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Thu, 1 Feb 2018 10:08:09 -0500 Subject: [PATCH] NIFI-4836 - Allow output of FlowFiles during result set processing in QueryDatabaseTable Signed-off-by: Pierre Villard This closes #2447. --- .../standard/QueryDatabaseTable.java | 67 +++++++++++++------ .../standard/QueryDatabaseTableTest.java | 57 ++++++++++++++++ 2 files changed, 104 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index ffe58455f2..56132060df 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -94,15 +94,16 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC @WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"), @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), - @WritesAttribute(attribute="fragment.count", description="If 'Max Rows Per Flow File' is set then this is the total number of " + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " - + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."), + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + "FlowFiles were produced"), @WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The " - + "suffix of the attribute is the name of the column")}) + + "suffix of the attribute is the name of the column. If Output Batch Size is set, then this attribute will not be populated.")}) @DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.") public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { @@ -112,7 +113,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() .name("Fetch Size") - .description("The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be " + .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be " + "honored and/or exact. If the value specified is zero, then the hint is ignored.") .defaultValue("0") .required(true) @@ -123,8 +124,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() .name("qdbt-max-rows") .displayName("Max Rows Per Flow File") - .description("The maximum number of result rows that will be included in a single FlowFile. " + - "This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") + .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large " + + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("qdbt-output-batch-size") + .displayName("Output Batch Size") + .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows " + + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles " + + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will " + + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this " + + "property is set.") .defaultValue("0") .required(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) @@ -135,7 +150,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .name("qdbt-max-frags") .displayName("Maximum Number of Fragments") .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " + - "This prevents OutOfMemoryError when this processor ingests huge table.") + "This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are " + + "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.") .defaultValue("0") .required(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) @@ -156,6 +172,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { pds.add(QUERY_TIMEOUT); pds.add(FETCH_SIZE); pds.add(MAX_ROWS_PER_FLOW_FILE); + pds.add(OUTPUT_BATCH_SIZE); pds.add(MAX_FRAGMENTS); pds.add(NORMALIZE_NAMES_FOR_AVRO); pds.add(USE_AVRO_LOGICAL_TYPES); @@ -199,6 +216,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue(); final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger(); final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField; final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet() ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger() : 0; @@ -315,6 +334,12 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); resultSetFlowFiles.add(fileToProcess); + // If we've reached the batch size, send out the flow files + if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) { + session.transfer(resultSetFlowFiles, REL_SUCCESS); + session.commit(); + resultSetFlowFiles.clear(); + } } else { // If there were no rows returned, don't send the flowfile session.remove(fileToProcess); @@ -328,22 +353,24 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { } } - for (int i = 0; i < resultSetFlowFiles.size(); i++) { - // Add maximum values as attributes - for (Map.Entry entry : statePropertyMap.entrySet()) { - // Get just the column name from the key - String key = entry.getKey(); - String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length()); - resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue())); - } + // Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes + if (outputBatchSize == 0) { + for (int i = 0; i < resultSetFlowFiles.size(); i++) { + // Add maximum values as attributes + for (Map.Entry entry : statePropertyMap.entrySet()) { + // Get just the column name from the key + String key = entry.getKey(); + String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length()); + resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue())); + } - //set count on all FlowFiles - if(maxRowsPerFlowFile > 0) { - resultSetFlowFiles.set(i, - session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex))); + //set count on all FlowFiles + if (maxRowsPerFlowFile > 0) { + resultSetFlowFiles.set(i, + session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex))); + } } } - } catch (final SQLException e) { throw e; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 8a2319bba8..d80542384b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -60,6 +60,7 @@ import java.util.Map; import java.util.TimeZone; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; /** @@ -499,6 +500,62 @@ public class QueryDatabaseTableTest { assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty()); } + @Test + public void testOutputBatchSize() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + InputStream in; + MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount=0; + // Create larger row set + for(int batch=0;batch<100;batch++){ + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}"); + runner.setVariable(MAX_ROWS_KEY, "7"); + runner.setProperty(QueryDatabaseTable.OUTPUT_BATCH_SIZE, "${outputBatchSize}"); + runner.setVariable("outputBatchSize", "4"); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 15); + + // Ensure all but the last file have 7 records each + for(int ff=0;ff<14;ff++) { + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff); + in = new ByteArrayInputStream(mff.toByteArray()); + assertEquals(7, getNumberOfRecordsFromStream(in)); + + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index")); + // No fragment.count set for flow files sent when Output Batch Size is set + assertNull(mff.getAttribute("fragment.count")); + } + + // Last file should have 2 records + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(14); + in = new ByteArrayInputStream(mff.toByteArray()); + assertEquals(2, getNumberOfRecordsFromStream(in)); + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(14), mff.getAttribute("fragment.index")); + // No fragment.count set for flow files sent when Output Batch Size is set + assertNull(mff.getAttribute("fragment.count")); + } + @Test public void testMaxRowsPerFlowFile() throws ClassNotFoundException, SQLException, InitializationException, IOException {