NIFI-6865: Added Fetch Size property to ExecuteSQL processors

This closes #3888.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Matthew Burgess 2019-11-14 10:48:45 -05:00 committed by Peter Turcsanyi
parent b3880a4a06
commit d617c0b96a
4 changed files with 24 additions and 0 deletions

View File

@ -155,6 +155,17 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("esql-fetch-size")
.displayName("Fetch Size")
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
+ "honored and/or exact. If the value specified is zero, then the hint is ignored.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected List<PropertyDescriptor> propDescriptors;
protected DBCPService dbcpService;
@ -203,6 +214,8 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
List<String> preQueries = getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
List<String> postQueries = getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
@ -222,6 +235,14 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
int resultCount = 0;
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
final PreparedStatement st = con.prepareStatement(selectQuery)) {
if (fetchSize != null && fetchSize > 0) {
try {
st.setFetchSize(fetchSize);
} catch (SQLException se) {
// Not all drivers support this, just log the error (at debug level) and move on
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
}
}
st.setQueryTimeout(queryTimeout); // timeout in seconds
// Execute pre-query, throw exception and cleanup Flow Files if fail

View File

@ -131,6 +131,7 @@ public class ExecuteSQL extends AbstractExecuteSQL {
pds.add(DEFAULT_SCALE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(OUTPUT_BATCH_SIZE);
pds.add(FETCH_SIZE);
propDescriptors = Collections.unmodifiableList(pds);
}

View File

@ -133,6 +133,7 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL {
pds.add(USE_AVRO_LOGICAL_TYPES);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(OUTPUT_BATCH_SIZE);
pds.add(FETCH_SIZE);
propDescriptors = Collections.unmodifiableList(pds);
}

View File

@ -360,6 +360,7 @@ public class TestExecuteSQL {
runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
runner.setProperty(AbstractExecuteSQL.FETCH_SIZE, "5");
runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "0");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
runner.run();