From 65b26e6f41e9de5a198a1b310811ffa825104923 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Mon, 28 Mar 2016 12:16:24 -0400 Subject: [PATCH] NIFI-1691: Add Fetch Size property to QueryDatabaseTable This closes #307 Signed-off-by: Bryan Bende --- .../standard/QueryDatabaseTable.java | 21 +++++++++++++++++++ .../standard/QueryDatabaseTableTest.java | 1 + 2 files changed, 22 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 08f6b4119e..9403eb81d3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -175,6 +175,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor { .defaultValue("None") .build(); + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() + .name("Fetch Size") + .description("The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be " + + "honored and/or exact. If the value specified is zero, then the hint is ignored.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + private final List propDescriptors; @@ -192,6 +201,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor { pds.add(MAX_VALUE_COLUMN_NAMES); pds.add(QUERY_TIMEOUT); pds.add(SQL_PREPROCESS_STRATEGY); + pds.add(FETCH_SIZE); propDescriptors = Collections.unmodifiableList(pds); } @@ -251,6 +261,8 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor { final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); final String preProcessStrategy = context.getProperty(SQL_PREPROCESS_STRATEGY).getValue(); + final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger(); + final StateManager stateManager = context.getStateManager(); final StateMap stateMap; @@ -272,6 +284,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor { try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { + if (fetchSize != null && fetchSize > 0) { + try { + st.setFetchSize(fetchSize); + } catch (SQLException se) { + // Not all drivers support this, just log the error (at debug level) and move on + logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se); + } + } + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index d16b9c646e..f932e4d0e0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -154,6 +154,7 @@ public class QueryDatabaseTableTest { runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2"); assertEquals(3, getNumberOfRecordsFromStream(in)); runner.clearTransferState();