NIFI-2582 NIFI-2583

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-2583 Tweaks

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #877
This commit is contained in:
Peter Wicks 2016-08-16 19:10:16 -06:00 committed by Matt Burgess
parent 3f7216ab84
commit 1470a52b1b
3 changed files with 289 additions and 53 deletions

View File

@ -17,11 +17,13 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -30,6 +32,7 @@ import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
@ -50,14 +53,7 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -75,10 +71,19 @@ import java.util.concurrent.atomic.AtomicLong;
+ "to fetch only those records that have max values greater than the retained values. This can be used for "
+ "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ "per the State Management documentation")
@WritesAttribute(attribute = "querydbtable.row.count")
@WritesAttributes({
@WritesAttribute(attribute = "querydbtable.row.count"),
@WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced")})
@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
+ "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
public static final String INTIIAL_MAX_VALUE_PROP_START = "initial.maxvalue.";
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
@ -90,6 +95,16 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
.name("qdbt-max-rows")
.displayName("Max Rows Per Flow File")
.description("The maximum number of result rows that will be included in a single FlowFile. " +
"This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public QueryDatabaseTable() {
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
@ -103,6 +118,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -116,6 +132,18 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
return propDescriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
@OnScheduled
public void setup(final ProcessContext context) {
super.setup(context);
@ -124,7 +152,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession();
FlowFile fileToProcess = null;
final Set<FlowFile> resultSetFlowFiles = new HashSet<>();
final ComponentLog logger = getLogger();
@ -134,6 +162,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger();
final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
@ -150,11 +181,19 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
//If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
for(final Map.Entry<String,String> maxProp : maxValueProperties.entrySet()){
if(!statePropertyMap.containsKey(maxProp.getKey())){
statePropertyMap.put(maxProp.getKey(), maxProp.getValue());
}
}
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? null
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, stateMap);
final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, statePropertyMap);
final StopWatch stopWatch = new StopWatch(true);
final String fragmentIdentifier = UUID.randomUUID().toString();
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
@ -168,52 +207,70 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
}
}
String jdbcURL = "DBCPService";
try {
DatabaseMetaData databaseMetaData = con.getMetaData();
if (databaseMetaData != null) {
jdbcURL = databaseMetaData.getURL();
}
} catch (SQLException se) {
// Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
}
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
final ResultSet resultSet = st.executeQuery(selectQuery);
int fragmentIndex=0;
while(true) {
final AtomicLong nrOfRows = new AtomicLong(0L);
final AtomicLong nrOfRows = new AtomicLong(0L);
FlowFile fileToProcess = session.create();
fileToProcess = session.write(fileToProcess, out -> {
// Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter);
try {
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile));
} catch (SQLException e) {
throw new ProcessException("Error during database query or conversion of records to Avro.", e);
}
});
fileToProcess = session.create();
fileToProcess = session.write(fileToProcess, out -> {
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
final ResultSet resultSet = st.executeQuery(selectQuery);
// Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter);
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector));
if (nrOfRows.get() > 0) {
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
} catch (final SQLException e) {
throw new ProcessException("Error during database query or conversion of records to Avro", e);
}
});
if(maxRowsPerFlowFile > 0) {
fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier);
fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex));
}
if (nrOfRows.get() > 0) {
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
String jdbcURL = "DBCPService";
try {
DatabaseMetaData databaseMetaData = con.getMetaData();
if (databaseMetaData != null) {
jdbcURL = databaseMetaData.getURL();
session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
resultSetFlowFiles.add(fileToProcess);
} else {
// If there were no rows returned, don't send the flowfile
session.remove(fileToProcess);
context.yield();
break;
}
} catch (SQLException se) {
// Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
fragmentIndex++;
}
session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(fileToProcess, REL_SUCCESS);
} else {
// If there were no rows returned, don't send the flowfile
session.remove(fileToProcess);
context.yield();
} catch (final SQLException e) {
throw e;
}
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
if (fileToProcess != null) {
session.remove(fileToProcess);
if (!resultSetFlowFiles.isEmpty()) {
session.remove(resultSetFlowFiles);
}
context.yield();
} finally {
@ -228,18 +285,17 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
}
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
StateMap stateMap) {
Map<String, String> stateMap) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name must be specified");
}
final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
// Check state map for last max values
if (stateMap != null && stateMap.getVersion() != -1 && maxValColumnNames != null) {
Map<String, String> stateProperties = stateMap.toMap();
if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
List<String> whereClauses = new ArrayList<>(maxValColumnNames.size());
for (String colName : maxValColumnNames) {
String maxValue = stateProperties.get(colName.toLowerCase());
String maxValue = stateMap.get(colName.toLowerCase());
if (!StringUtils.isEmpty(maxValue)) {
Integer type = columnTypeMap.get(colName.toLowerCase());
if (type == null) {
@ -260,6 +316,20 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
}
protected Map<String,String> getDefaultMaxValueProperties(final Map<PropertyDescriptor, String> properties){
final Map<String,String> defaultMaxValues = new HashMap<String, String>();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final String key = entry.getKey().getName();
if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) { continue; }
defaultMaxValues.put(key.substring(INTIIAL_MAX_VALUE_PROP_START.length()), entry.getValue());
}
return defaultMaxValues;
}
protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
DatabaseAdapter dbAdapter;
Map<String, String> newColMap;

View File

@ -79,7 +79,11 @@ public class JdbcCommon {
return convertToAvroStream(rs, outStream, recordName, null);
}
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback)
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) throws IOException, SQLException {
return convertToAvroStream(rs, outStream, recordName, callback, 0);
}
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, final int maxRows)
throws SQLException, IOException {
final Schema schema = createSchema(rs, recordName);
final GenericRecord rec = new GenericData.Record(schema);
@ -155,6 +159,9 @@ public class JdbcCommon {
}
dataFileWriter.append(rec);
nrOfRows += 1;
if(maxRows > 0 && nrOfRows == maxRows)
break;
}
return nrOfRows;

View File

@ -43,6 +43,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
@ -50,9 +51,11 @@ import java.sql.SQLNonTransientConnectionException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -133,18 +136,18 @@ public class QueryDatabaseTableTest {
StateManager stateManager = runner.getStateManager();
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("id", Types.INTEGER);
query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER));
query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509", query);
maxValues.put("date_created", "2016-03-07 12:34:56");
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("date_created", Types.TIMESTAMP);
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER));
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query);
// Test Oracle strategy
dbAdapter = new OracleDatabaseAdapter();
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER));
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query);
}
@ -329,7 +332,7 @@ public class QueryDatabaseTableTest {
runner.clearTransferState();
}
@Test
@Test
public void testWithNullIntColumn() throws SQLException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
@ -377,6 +380,162 @@ public class QueryDatabaseTableTest {
assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
}
@Test
public void testMaxRowsPerFlowFile() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
InputStream in;
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
int rowCount=0;
//create larger row set
for(int batch=0;batch<100;batch++){
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
rowCount++;
}
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "9");//Using a non-round number to make sure the last file is ragged
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 12);
//ensure all but the last file have 9 records each
for(int ff=0;ff<11;ff++) {
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff).toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
}
//last file should have 1 record
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
// Run again, this time should be a single partial flow file
for(int batch=0;batch<5;batch++){
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
rowCount++;
}
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(5, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Run again, this time should be a full batch and a partial
for(int batch=0;batch<14;batch++){
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
rowCount++;
}
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1).toByteArray());
assertEquals(5, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Run again with a cleaned state. Should get all rows split into batches
int ffCount = (int) Math.ceil((double)rowCount / 9D);
runner.getStateManager().clear(Scope.CLUSTER);
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, ffCount);
//ensure all but the last file have 9 records each
for(int ff=0;ff<ffCount-1;ff++) {
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff).toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
}
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ffCount-1).toByteArray());
assertEquals(rowCount%9, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
}
@Test
public void testInitialMaxValue() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
InputStream in;
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
cal.setTimeInMillis(0);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
int rowCount=0;
//create larger row set
for(int batch=0;batch<10;batch++){
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
rowCount++;
cal.add(Calendar.MINUTE, 1);
}
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on");
cal.setTimeInMillis(0);
cal.add(Calendar.MINUTE, 5);
runner.setProperty("initial.maxvalue.created_on", dateFormat.format(cal.getTime().getTime()));
// Initial run with no previous state. Should get only last 4 records
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(4, getNumberOfRecordsFromStream(in));
runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
// Validate Max Value doesn't change also
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
runner.clearTransferState();
// Append a new row, expect 1 flowfile one row
cal.setTimeInMillis(0);
cal.add(Calendar.MINUTE, rowCount);
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
rowCount++;
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:10:00.0", Scope.CLUSTER);
runner.clearTransferState();
}
private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {