diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 1fcb33ef83..f5407dad42 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -318,12 +318,12 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { throw new SQLException("No rows returned from metadata query: " + selectQuery); } - final long numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); + final long numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); // Generate SQL statements to read "pages" of data for (long i = 0; i < numberOfFetches; i++) { - long limit = partitionSize == 0 ? null : partitionSize; - long offset = partitionSize == 0 ? null : i * partitionSize; + Long limit = partitionSize == 0 ? null : (long) partitionSize; + Long offset = partitionSize == 0 ? null : i * partitionSize; final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset); FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 67ab72f30f..76bc1f0324 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -218,6 +218,36 @@ public class TestGenerateTableFetch { runner.clearTransferState(); } + @Test + public void testOnePartition() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID"); + // Set partition size to 0 so we can see that the flow file gets all rows + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "0"); + + runner.run(); + runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0).assertContentEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY ID"); + runner.clearTransferState(); + } + @Test public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException {