NIFI-3455 Large row count paging

This closes #1499.
This commit is contained in:
patricker 2017-02-09 10:45:01 -07:00 committed by Pierre Villard
parent 8ffa1703ba
commit 9cfc13423d
6 changed files with 64 additions and 13 deletions

View File

@ -231,10 +231,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
// Build a SELECT query with maximum-value columns (if present) // Build a SELECT query with maximum-value columns (if present)
final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null); final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null);
int rowCount = 0; long rowCount = 0;
try (final Connection con = dbcpService.getConnection(); try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) { final Statement st = con.createStatement()) {
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue(); final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds st.setQueryTimeout(queryTimeout); // timeout in seconds
@ -246,7 +246,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
if (resultSet.next()) { if (resultSet.next()) {
// Total row count is in the first column // Total row count is in the first column
rowCount = resultSet.getInt(1); rowCount = resultSet.getLong(1);
// Update the state map with the newly-observed maximum values // Update the state map with the newly-observed maximum values
ResultSetMetaData rsmd = resultSet.getMetaData(); ResultSetMetaData rsmd = resultSet.getMetaData();
@ -282,12 +282,12 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
throw new SQLException("No rows returned from metadata query: " + selectQuery); throw new SQLException("No rows returned from metadata query: " + selectQuery);
} }
final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); final long numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
// Generate SQL statements to read "pages" of data // Generate SQL statements to read "pages" of data
for (int i = 0; i < numberOfFetches; i++) { for (long i = 0; i < numberOfFetches; i++) {
Integer limit = partitionSize == 0 ? null : partitionSize; long limit = partitionSize == 0 ? null : partitionSize;
Integer offset = partitionSize == 0 ? null : i * partitionSize; long offset = partitionSize == 0 ? null : i * partitionSize;
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset); final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset);
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));

View File

@ -34,5 +34,5 @@ public interface DatabaseAdapter {
* @param offset The value for the OFFSET clause (i.e. the number of rows to skip) * @param offset The value for the OFFSET clause (i.e. the number of rows to skip)
* @return A String containing a SQL SELECT statement with the given clauses applied * @return A String containing a SQL SELECT statement with the given clauses applied
*/ */
String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset); String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset);
} }

View File

@ -29,7 +29,7 @@ public class GenericDatabaseAdapter implements DatabaseAdapter {
} }
@Override @Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) { public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
if (StringUtils.isEmpty(tableName)) { if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty"); throw new IllegalArgumentException("Table name cannot be null or empty");
} }

View File

@ -29,7 +29,7 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
} }
@Override @Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) { public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
if (StringUtils.isEmpty(tableName)) { if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty"); throw new IllegalArgumentException("Table name cannot be null or empty");
} }
@ -66,7 +66,7 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
} }
if (nestedSelect) { if (nestedSelect) {
query.append(") a"); query.append(") a");
int offsetVal = 0; long offsetVal = 0;
if (offset != null) { if (offset != null) {
offsetVal = offset; offsetVal = offset;
} }

View File

@ -38,9 +38,11 @@ import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException; import java.sql.SQLNonTransientConnectionException;
import java.sql.Statement; import java.sql.Statement;
import java.sql.Types;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -49,6 +51,11 @@ import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/** /**
@ -58,6 +65,7 @@ public class TestGenerateTableFetch {
TestRunner runner; TestRunner runner;
GenerateTableFetch processor; GenerateTableFetch processor;
DBCPServiceSimpleImpl dbcp;
private final static String DB_LOCATION = "target/db_gtf"; private final static String DB_LOCATION = "target/db_gtf";
@ -93,7 +101,9 @@ public class TestGenerateTableFetch {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
processor = new GenerateTableFetch(); processor = new GenerateTableFetch();
final DBCPService dbcp = new DBCPServiceSimpleImpl(); //Mock the DBCP Controller Service so we can control the Results
dbcp = spy(new DBCPServiceSimpleImpl());
final Map<String, String> dbcpProperties = new HashMap<>(); final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
@ -514,6 +524,47 @@ public class TestGenerateTableFetch {
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
} }
@Test
public void testRidiculousRowCount() throws ClassNotFoundException, SQLException, InitializationException, IOException {
long rowCount= Long.parseLong(Integer.toString(Integer.MAX_VALUE)) + 100;
int partitionSize = 1000000;
int expectedFileCount = (int)(rowCount/partitionSize) + 1;
Connection conn = mock(Connection.class);
when(dbcp.getConnection()).thenReturn(conn);
Statement st = mock(Statement.class);
when(conn.createStatement()).thenReturn(st);
doNothing().when(st).close();
ResultSet rs = mock(ResultSet.class);
when(st.executeQuery(anyString())).thenReturn(rs);
when(rs.next()).thenReturn(true);
when(rs.getInt(1)).thenReturn((int)rowCount);
when(rs.getLong(1)).thenReturn(rowCount);
final ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
when(rs.getMetaData()).thenReturn(resultSetMetaData);
when(resultSetMetaData.getColumnCount()).thenReturn(2);
when(resultSetMetaData.getTableName(1)).thenReturn("");
when(resultSetMetaData.getColumnType(1)).thenReturn(Types.INTEGER);
when(resultSetMetaData.getColumnName(1)).thenReturn("COUNT");
when(resultSetMetaData.getColumnType(2)).thenReturn(Types.INTEGER);
when(resultSetMetaData.getColumnName(2)).thenReturn("ID");
when(rs.getInt(2)).thenReturn(1000);
runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, Integer.toString(partitionSize));
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, expectedFileCount);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY ID FETCH NEXT 1000000 ROWS ONLY", query);
runner.clearTransferState();
}
/** /**
* Simple implementation only for GenerateTableFetch processor testing. * Simple implementation only for GenerateTableFetch processor testing.

View File

@ -30,7 +30,7 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter {
} }
@Override @Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) { public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
if (StringUtils.isEmpty(tableName)) { if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty"); throw new IllegalArgumentException("Table name cannot be null or empty");
} }