diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index ea24dd006c..3b68e292c8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -17,11 +17,13 @@ package org.apache.nifi.processors.standard; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -30,6 +32,7 @@ import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; @@ -58,6 +61,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -75,10 +79,23 @@ import java.util.concurrent.atomic.AtomicLong; + "to fetch only those records that have max values greater than the retained values. This can be used for " + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + "per the State Management documentation") -@WritesAttribute(attribute = "querydbtable.row.count") +@WritesAttributes({ + @WritesAttribute(attribute = "querydbtable.row.count"), + @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute="fragment.count", description="If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."), + @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced")}) +@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.") public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; + public static final String INTIIAL_MAX_VALUE_PROP_START = "initial.maxvalue."; public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() @@ -90,6 +107,16 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); + public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() + .name("qdbt-max-rows") + .displayName("Max Rows Per Flow File") + .description("The maximum number of result rows that will be included in a single FlowFile. " + + "This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + public QueryDatabaseTable() { final Set r = new HashSet<>(); r.add(REL_SUCCESS); @@ -103,6 +130,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { pds.add(MAX_VALUE_COLUMN_NAMES); pds.add(QUERY_TIMEOUT); pds.add(FETCH_SIZE); + pds.add(MAX_ROWS_PER_FLOW_FILE); propDescriptors = Collections.unmodifiableList(pds); } @@ -116,6 +144,18 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { return propDescriptors; } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + @OnScheduled public void setup(final ProcessContext context) { super.setup(context); @@ -124,7 +164,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { ProcessSession session = sessionFactory.createSession(); - FlowFile fileToProcess = null; + final List resultSetFlowFiles = new ArrayList<>(); final ComponentLog logger = getLogger(); @@ -134,6 +174,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger(); + final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger(); + + final Map maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); final StateManager stateManager = context.getStateManager(); final StateMap stateMap; @@ -150,11 +193,19 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { // set as the current state map (after the session has been committed) final Map statePropertyMap = new HashMap<>(stateMap.toMap()); + //If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map + for(final Map.Entry maxProp : maxValueProperties.entrySet()){ + if(!statePropertyMap.containsKey(maxProp.getKey())){ + statePropertyMap.put(maxProp.getKey(), maxProp.getValue()); + } + } + List maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames) ? null : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*")); - final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, stateMap); + final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, statePropertyMap); final StopWatch stopWatch = new StopWatch(true); + final String fragmentIdentifier = UUID.randomUUID().toString(); try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { @@ -168,52 +219,78 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { } } + String jdbcURL = "DBCPService"; + try { + DatabaseMetaData databaseMetaData = con.getMetaData(); + if (databaseMetaData != null) { + jdbcURL = databaseMetaData.getURL(); + } + } catch (SQLException se) { + // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly + } + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds + try { + logger.debug("Executing query {}", new Object[]{selectQuery}); + final ResultSet resultSet = st.executeQuery(selectQuery); + int fragmentIndex=0; + while(true) { + final AtomicLong nrOfRows = new AtomicLong(0L); - final AtomicLong nrOfRows = new AtomicLong(0L); + FlowFile fileToProcess = session.create(); + fileToProcess = session.write(fileToProcess, out -> { + // Max values will be updated in the state property map by the callback + final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter); + try { + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile)); + } catch (SQLException e) { + throw new ProcessException("Error during database query or conversion of records to Avro.", e); + } + }); - fileToProcess = session.create(); - fileToProcess = session.write(fileToProcess, out -> { - try { - logger.debug("Executing query {}", new Object[]{selectQuery}); - final ResultSet resultSet = st.executeQuery(selectQuery); - // Max values will be updated in the state property map by the callback - final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter); - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector)); + if (nrOfRows.get() > 0) { + // set attribute how many rows were selected + fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - } catch (final SQLException e) { - throw new ProcessException("Error during database query or conversion of records to Avro", e); - } - }); + if(maxRowsPerFlowFile > 0) { + fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier); + fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex)); + } - if (nrOfRows.get() > 0) { - // set attribute how many rows were selected - fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + logger.info("{} contains {} Avro records; transferring to 'success'", + new Object[]{fileToProcess, nrOfRows.get()}); - logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{fileToProcess, nrOfRows.get()}); - String jdbcURL = "DBCPService"; - try { - DatabaseMetaData databaseMetaData = con.getMetaData(); - if (databaseMetaData != null) { - jdbcURL = databaseMetaData.getURL(); + session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + + resultSetFlowFiles.add(fileToProcess); + } else { + // If there were no rows returned, don't send the flowfile + session.remove(fileToProcess); + context.yield(); + break; } - } catch (SQLException se) { - // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly + + fragmentIndex++; } - session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(fileToProcess, REL_SUCCESS); - } else { - // If there were no rows returned, don't send the flowfile - session.remove(fileToProcess); - context.yield(); + + //set count on all FlowFiles + if(maxRowsPerFlowFile > 0) { + for (int i = 0; i < resultSetFlowFiles.size(); i++) { + resultSetFlowFiles.set(i, + session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex))); + } + } + } catch (final SQLException e) { + throw e; } + session.transfer(resultSetFlowFiles, REL_SUCCESS); + } catch (final ProcessException | SQLException e) { logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); - if (fileToProcess != null) { - session.remove(fileToProcess); + if (!resultSetFlowFiles.isEmpty()) { + session.remove(resultSetFlowFiles); } context.yield(); } finally { @@ -228,18 +305,17 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { } protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List maxValColumnNames, - StateMap stateMap) { + Map stateMap) { if (StringUtils.isEmpty(tableName)) { throw new IllegalArgumentException("Table name must be specified"); } final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null)); // Check state map for last max values - if (stateMap != null && stateMap.getVersion() != -1 && maxValColumnNames != null) { - Map stateProperties = stateMap.toMap(); + if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) { List whereClauses = new ArrayList<>(maxValColumnNames.size()); for (String colName : maxValColumnNames) { - String maxValue = stateProperties.get(colName.toLowerCase()); + String maxValue = stateMap.get(colName.toLowerCase()); if (!StringUtils.isEmpty(maxValue)) { Integer type = columnTypeMap.get(colName.toLowerCase()); if (type == null) { @@ -260,6 +336,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { } + protected Map getDefaultMaxValueProperties(final Map properties){ + final Map defaultMaxValues = new HashMap(); + + for (final Map.Entry entry : properties.entrySet()) { + final String key = entry.getKey().getName(); + + if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) { + continue; + } + + defaultMaxValues.put(key.substring(INTIIAL_MAX_VALUE_PROP_START.length()), entry.getValue()); + } + + return defaultMaxValues; + } + protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback { DatabaseAdapter dbAdapter; Map newColMap; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 5ece1490ba..8d81b34a01 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -79,7 +79,11 @@ public class JdbcCommon { return convertToAvroStream(rs, outStream, recordName, null); } - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) throws IOException, SQLException { + return convertToAvroStream(rs, outStream, recordName, callback, 0); + } + + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, final int maxRows) throws SQLException, IOException { final Schema schema = createSchema(rs, recordName); final GenericRecord rec = new GenericData.Record(schema); @@ -155,6 +159,9 @@ public class JdbcCommon { } dataFileWriter.append(rec); nrOfRows += 1; + + if(maxRows > 0 && nrOfRows == maxRows) + break; } return nrOfRows; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index a373f8fe59..40fba54f45 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -30,6 +30,7 @@ import org.apache.nifi.processors.standard.db.DatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.file.FileUtils; @@ -43,6 +44,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.text.SimpleDateFormat; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -50,9 +52,11 @@ import java.sql.SQLNonTransientConnectionException; import java.sql.Statement; import java.sql.Types; import java.util.Arrays; +import java.util.Calendar; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.TimeZone; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -133,18 +137,18 @@ public class QueryDatabaseTableTest { StateManager stateManager = runner.getStateManager(); stateManager.setState(maxValues, Scope.CLUSTER); processor.putColumnType("id", Types.INTEGER); - query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER)); + query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER).toMap()); assertEquals("SELECT * FROM myTable WHERE id > 509", query); maxValues.put("date_created", "2016-03-07 12:34:56"); stateManager.setState(maxValues, Scope.CLUSTER); processor.putColumnType("date_created", Types.TIMESTAMP); - query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER)); + query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap()); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query); // Test Oracle strategy dbAdapter = new OracleDatabaseAdapter(); - query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER)); + query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap()); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query); } @@ -329,7 +333,7 @@ public class QueryDatabaseTableTest { runner.clearTransferState(); } - @Test + @Test public void testWithNullIntColumn() throws SQLException { // load test data to database final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); @@ -377,6 +381,176 @@ public class QueryDatabaseTableTest { assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty()); } + @Test + public void testMaxRowsPerFlowFile() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + InputStream in; + MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount=0; + //create larger row set + for(int batch=0;batch<100;batch++){ + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "9");//Using a non-round number to make sure the last file is ragged + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 12); + + //ensure all but the last file have 9 records each + for(int ff=0;ff<11;ff++) { + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff); + in = new ByteArrayInputStream(mff.toByteArray()); + assertEquals(9, getNumberOfRecordsFromStream(in)); + + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); + } + + //last file should have 1 record + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11); + in = new ByteArrayInputStream(mff.toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(11), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Run again, this time should be a single partial flow file + for(int batch=0;batch<5;batch++){ + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + in = new ByteArrayInputStream(mff.toByteArray()); + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(0), mff.getAttribute("fragment.index")); + assertEquals("1", mff.getAttribute("fragment.count")); + assertEquals(5, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Run again, this time should be a full batch and a partial + for(int batch=0;batch<14;batch++){ + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(9, getNumberOfRecordsFromStream(in)); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1).toByteArray()); + assertEquals(5, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Run again with a cleaned state. Should get all rows split into batches + int ffCount = (int) Math.ceil((double)rowCount / 9D); + runner.getStateManager().clear(Scope.CLUSTER); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, ffCount); + + //ensure all but the last file have 9 records each + for(int ff=0;ff datumReader = new GenericDatumReader<>(); try (DataFileStream dataFileReader = new DataFileStream<>(in, datumReader)) {