NIFI-4286: Fix NPE in GenerateTableFetch when Partition Size = 0

This closes #2078.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matt Burgess 2017-08-11 23:38:17 -04:00 committed by Bryan Bende
parent 8aa4450084
commit 50f22162b0
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 33 additions and 3 deletions

View File

@ -318,12 +318,12 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
throw new SQLException("No rows returned from metadata query: " + selectQuery); throw new SQLException("No rows returned from metadata query: " + selectQuery);
} }
final long numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); final long numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
// Generate SQL statements to read "pages" of data // Generate SQL statements to read "pages" of data
for (long i = 0; i < numberOfFetches; i++) { for (long i = 0; i < numberOfFetches; i++) {
long limit = partitionSize == 0 ? null : partitionSize; Long limit = partitionSize == 0 ? null : (long) partitionSize;
long offset = partitionSize == 0 ? null : i * partitionSize; Long offset = partitionSize == 0 ? null : i * partitionSize;
final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset); final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset);
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);

View File

@ -218,6 +218,36 @@ public class TestGenerateTableFetch {
runner.clearTransferState(); runner.clearTransferState();
} }
@Test
public void testOnePartition() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
// Set partition size to 0 so we can see that the flow file gets all rows
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "0");
runner.run();
runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0).assertContentEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY ID");
runner.clearTransferState();
}
@Test @Test
public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException { public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException {