mirror of https://github.com/apache/nifi.git
NIFI-4836 - Allow output of FlowFiles during result set processing in QueryDatabaseTable
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2447.
This commit is contained in:
parent
82e36f3c71
commit
2a5e21c11b
|
@ -94,15 +94,16 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
|
|||
@WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"),
|
||||
@WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
|
||||
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
|
||||
@WritesAttribute(attribute="fragment.count", description="If 'Max Rows Per Flow File' is set then this is the total number of "
|
||||
@WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
|
||||
+ "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
|
||||
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."),
|
||||
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
|
||||
+ "attribute will not be populated."),
|
||||
@WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
|
||||
+ "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
|
||||
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
|
||||
+ "FlowFiles were produced"),
|
||||
@WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The "
|
||||
+ "suffix of the attribute is the name of the column")})
|
||||
+ "suffix of the attribute is the name of the column. If Output Batch Size is set, then this attribute will not be populated.")})
|
||||
@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
|
||||
+ "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
|
||||
public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
|
||||
|
@ -112,7 +113,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
|
|||
|
||||
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Fetch Size")
|
||||
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be "
|
||||
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
|
||||
+ "honored and/or exact. If the value specified is zero, then the hint is ignored.")
|
||||
.defaultValue("0")
|
||||
.required(true)
|
||||
|
@ -123,8 +124,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
|
|||
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
|
||||
.name("qdbt-max-rows")
|
||||
.displayName("Max Rows Per Flow File")
|
||||
.description("The maximum number of result rows that will be included in a single FlowFile. " +
|
||||
"This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
|
||||
.description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
|
||||
+ "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
|
||||
.defaultValue("0")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("qdbt-output-batch-size")
|
||||
.displayName("Output Batch Size")
|
||||
.description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
|
||||
+ "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
|
||||
+ "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
|
||||
+ "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this "
|
||||
+ "property is set.")
|
||||
.defaultValue("0")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
|
@ -135,7 +150,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
|
|||
.name("qdbt-max-frags")
|
||||
.displayName("Maximum Number of Fragments")
|
||||
.description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
|
||||
"This prevents OutOfMemoryError when this processor ingests huge table.")
|
||||
"This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are "
|
||||
+ "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.")
|
||||
.defaultValue("0")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
|
@ -156,6 +172,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
|
|||
pds.add(QUERY_TIMEOUT);
|
||||
pds.add(FETCH_SIZE);
|
||||
pds.add(MAX_ROWS_PER_FLOW_FILE);
|
||||
pds.add(OUTPUT_BATCH_SIZE);
|
||||
pds.add(MAX_FRAGMENTS);
|
||||
pds.add(NORMALIZE_NAMES_FOR_AVRO);
|
||||
pds.add(USE_AVRO_LOGICAL_TYPES);
|
||||
|
@ -199,6 +216,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
|
|||
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
|
||||
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
|
||||
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
|
||||
final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
|
||||
final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
|
||||
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
|
||||
? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
|
||||
: 0;
|
||||
|
@ -315,6 +334,12 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
|
|||
|
||||
session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||
resultSetFlowFiles.add(fileToProcess);
|
||||
// If we've reached the batch size, send out the flow files
|
||||
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
|
||||
session.transfer(resultSetFlowFiles, REL_SUCCESS);
|
||||
session.commit();
|
||||
resultSetFlowFiles.clear();
|
||||
}
|
||||
} else {
|
||||
// If there were no rows returned, don't send the flowfile
|
||||
session.remove(fileToProcess);
|
||||
|
@ -328,22 +353,24 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
|
||||
// Add maximum values as attributes
|
||||
for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
|
||||
// Get just the column name from the key
|
||||
String key = entry.getKey();
|
||||
String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
|
||||
resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
|
||||
}
|
||||
// Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
|
||||
if (outputBatchSize == 0) {
|
||||
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
|
||||
// Add maximum values as attributes
|
||||
for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
|
||||
// Get just the column name from the key
|
||||
String key = entry.getKey();
|
||||
String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
|
||||
resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
|
||||
}
|
||||
|
||||
//set count on all FlowFiles
|
||||
if(maxRowsPerFlowFile > 0) {
|
||||
resultSetFlowFiles.set(i,
|
||||
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
|
||||
//set count on all FlowFiles
|
||||
if (maxRowsPerFlowFile > 0) {
|
||||
resultSetFlowFiles.set(i,
|
||||
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (final SQLException e) {
|
||||
throw e;
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ import java.util.Map;
|
|||
import java.util.TimeZone;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
|
@ -499,6 +500,62 @@ public class QueryDatabaseTableTest {
|
|||
assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOutputBatchSize() throws ClassNotFoundException, SQLException, InitializationException, IOException {
|
||||
|
||||
// load test data to database
|
||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||
Statement stmt = con.createStatement();
|
||||
InputStream in;
|
||||
MockFlowFile mff;
|
||||
|
||||
try {
|
||||
stmt.execute("drop table TEST_QUERY_DB_TABLE");
|
||||
} catch (final SQLException sqle) {
|
||||
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
|
||||
}
|
||||
|
||||
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
|
||||
int rowCount=0;
|
||||
// Create larger row set
|
||||
for(int batch=0;batch<100;batch++){
|
||||
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
|
||||
rowCount++;
|
||||
}
|
||||
|
||||
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
|
||||
runner.setIncomingConnection(false);
|
||||
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
|
||||
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}");
|
||||
runner.setVariable(MAX_ROWS_KEY, "7");
|
||||
runner.setProperty(QueryDatabaseTable.OUTPUT_BATCH_SIZE, "${outputBatchSize}");
|
||||
runner.setVariable("outputBatchSize", "4");
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 15);
|
||||
|
||||
// Ensure all but the last file have 7 records each
|
||||
for(int ff=0;ff<14;ff++) {
|
||||
mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff);
|
||||
in = new ByteArrayInputStream(mff.toByteArray());
|
||||
assertEquals(7, getNumberOfRecordsFromStream(in));
|
||||
|
||||
mff.assertAttributeExists("fragment.identifier");
|
||||
assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index"));
|
||||
// No fragment.count set for flow files sent when Output Batch Size is set
|
||||
assertNull(mff.getAttribute("fragment.count"));
|
||||
}
|
||||
|
||||
// Last file should have 2 records
|
||||
mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(14);
|
||||
in = new ByteArrayInputStream(mff.toByteArray());
|
||||
assertEquals(2, getNumberOfRecordsFromStream(in));
|
||||
mff.assertAttributeExists("fragment.identifier");
|
||||
assertEquals(Integer.toString(14), mff.getAttribute("fragment.index"));
|
||||
// No fragment.count set for flow files sent when Output Batch Size is set
|
||||
assertNull(mff.getAttribute("fragment.count"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxRowsPerFlowFile() throws ClassNotFoundException, SQLException, InitializationException, IOException {
|
||||
|
||||
|
|
Loading…
Reference in New Issue