NIFI-1691: Add Fetch Size property to QueryDatabaseTable

This closes #307

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matt Burgess 2016-03-28 12:16:24 -04:00 committed by Bryan Bende
parent c3d54ab724
commit 65b26e6f41
2 changed files with 22 additions and 0 deletions

View File

@ -175,6 +175,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
.defaultValue("None")
.build();
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Fetch Size")
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be "
+ "honored and/or exact. If the value specified is zero, then the hint is ignored.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
private final List<PropertyDescriptor> propDescriptors;
@ -192,6 +201,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT);
pds.add(SQL_PREPROCESS_STRATEGY);
pds.add(FETCH_SIZE);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -251,6 +261,8 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final String preProcessStrategy = context.getProperty(SQL_PREPROCESS_STRATEGY).getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
@ -272,6 +284,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
if (fetchSize != null && fetchSize > 0) {
try {
st.setFetchSize(fetchSize);
} catch (SQLException se) {
// Not all drivers support this, just log the error (at debug level) and move on
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
}
}
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds

View File

@ -154,6 +154,7 @@ public class QueryDatabaseTableTest {
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
assertEquals(3, getNumberOfRecordsFromStream(in));
runner.clearTransferState();