diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 966c20db4b..48ec403991 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -231,10 +231,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { // Build a SELECT query with maximum-value columns (if present) final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null); - int rowCount = 0; + long rowCount = 0; try (final Connection con = dbcpService.getConnection(); - final Statement st = con.createStatement()) { + final Statement st = con.createStatement()) { final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds @@ -246,7 +246,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { if (resultSet.next()) { // Total row count is in the first column - rowCount = resultSet.getInt(1); + rowCount = resultSet.getLong(1); // Update the state map with the newly-observed maximum values ResultSetMetaData rsmd = resultSet.getMetaData(); @@ -282,12 +282,12 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { throw new SQLException("No rows returned from metadata query: " + selectQuery); } - final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); + final long numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); // Generate SQL statements to read "pages" of data - for (int i = 0; i < numberOfFetches; i++) { - Integer limit = partitionSize == 0 ? null : partitionSize; - Integer offset = partitionSize == 0 ? null : i * partitionSize; + for (long i = 0; i < numberOfFetches; i++) { + long limit = partitionSize == 0 ? null : partitionSize; + long offset = partitionSize == 0 ? null : i * partitionSize; final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset); FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java index 21ab3318db..b7f3e72885 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java @@ -34,5 +34,5 @@ public interface DatabaseAdapter { * @param offset The value for the OFFSET clause (i.e. the number of rows to skip) * @return A String containing a SQL SELECT statement with the given clauses applied */ - String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset); + String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java index c48d2cdb58..ae3af7ad04 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java @@ -29,7 +29,7 @@ public class GenericDatabaseAdapter implements DatabaseAdapter { } @Override - public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) { + public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) { if (StringUtils.isEmpty(tableName)) { throw new IllegalArgumentException("Table name cannot be null or empty"); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java index d918400dc7..9338343177 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java @@ -29,7 +29,7 @@ public class OracleDatabaseAdapter implements DatabaseAdapter { } @Override - public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) { + public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) { if (StringUtils.isEmpty(tableName)) { throw new IllegalArgumentException("Table name cannot be null or empty"); } @@ -66,7 +66,7 @@ public class OracleDatabaseAdapter implements DatabaseAdapter { } if (nestedSelect) { query.append(") a"); - int offsetVal = 0; + long offsetVal = 0; if (offset != null) { offsetVal = offset; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index f79f96cbc8..79093b2cf2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -38,9 +38,11 @@ import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.SQLNonTransientConnectionException; import java.sql.Statement; +import java.sql.Types; import java.util.HashMap; import java.util.Map; @@ -49,6 +51,11 @@ import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; /** @@ -58,6 +65,7 @@ public class TestGenerateTableFetch { TestRunner runner; GenerateTableFetch processor; + DBCPServiceSimpleImpl dbcp; private final static String DB_LOCATION = "target/db_gtf"; @@ -93,7 +101,9 @@ public class TestGenerateTableFetch { @Before public void setUp() throws Exception { processor = new GenerateTableFetch(); - final DBCPService dbcp = new DBCPServiceSimpleImpl(); + //Mock the DBCP Controller Service so we can control the Results + dbcp = spy(new DBCPServiceSimpleImpl()); + final Map dbcpProperties = new HashMap<>(); runner = TestRunners.newTestRunner(processor); @@ -514,6 +524,47 @@ public class TestGenerateTableFetch { assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); } + @Test + public void testRidiculousRowCount() throws ClassNotFoundException, SQLException, InitializationException, IOException { + long rowCount= Long.parseLong(Integer.toString(Integer.MAX_VALUE)) + 100; + int partitionSize = 1000000; + int expectedFileCount = (int)(rowCount/partitionSize) + 1; + + Connection conn = mock(Connection.class); + when(dbcp.getConnection()).thenReturn(conn); + Statement st = mock(Statement.class); + when(conn.createStatement()).thenReturn(st); + doNothing().when(st).close(); + ResultSet rs = mock(ResultSet.class); + when(st.executeQuery(anyString())).thenReturn(rs); + when(rs.next()).thenReturn(true); + when(rs.getInt(1)).thenReturn((int)rowCount); + when(rs.getLong(1)).thenReturn(rowCount); + + final ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class); + when(rs.getMetaData()).thenReturn(resultSetMetaData); + when(resultSetMetaData.getColumnCount()).thenReturn(2); + when(resultSetMetaData.getTableName(1)).thenReturn(""); + when(resultSetMetaData.getColumnType(1)).thenReturn(Types.INTEGER); + when(resultSetMetaData.getColumnName(1)).thenReturn("COUNT"); + when(resultSetMetaData.getColumnType(2)).thenReturn(Types.INTEGER); + when(resultSetMetaData.getColumnName(2)).thenReturn("ID"); + when(rs.getInt(2)).thenReturn(1000); + + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, Integer.toString(partitionSize)); + + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, expectedFileCount); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + String query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY ID FETCH NEXT 1000000 ROWS ONLY", query); + runner.clearTransferState(); + } + /** * Simple implementation only for GenerateTableFetch processor testing. diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java index 7d6ab8970f..66a473de54 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java @@ -30,7 +30,7 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter { } @Override - public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) { + public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) { if (StringUtils.isEmpty(tableName)) { throw new IllegalArgumentException("Table name cannot be null or empty"); }