(Apologies for merge commits, but it was not possible to rebase the 2574 commits (PR 872)).

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Andy LoPresto 2016-08-17 10:52:43 -07:00
commit 0ffdc2eb92
No known key found for this signature in database
GPG Key ID: 3C6EF65B2F7DEF69
3 changed files with 318 additions and 45 deletions

View File

@ -17,11 +17,13 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -30,6 +32,7 @@ import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -58,6 +61,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -75,10 +79,23 @@ import java.util.concurrent.atomic.AtomicLong;
+ "to fetch only those records that have max values greater than the retained values. This can be used for " + "to fetch only those records that have max values greater than the retained values. This can be used for "
+ "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ "per the State Management documentation") + "per the State Management documentation")
@WritesAttribute(attribute = "querydbtable.row.count") @WritesAttributes({
@WritesAttribute(attribute = "querydbtable.row.count"),
@WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute="fragment.count", description="If 'Max Rows Per Flow File' is set then this is the total number of "
+ "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."),
@WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced")})
@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
+ "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
public static final String INTIIAL_MAX_VALUE_PROP_START = "initial.maxvalue.";
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
@ -90,6 +107,16 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
.name("qdbt-max-rows")
.displayName("Max Rows Per Flow File")
.description("The maximum number of result rows that will be included in a single FlowFile. " +
"This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public QueryDatabaseTable() { public QueryDatabaseTable() {
final Set<Relationship> r = new HashSet<>(); final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS); r.add(REL_SUCCESS);
@ -103,6 +130,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
pds.add(MAX_VALUE_COLUMN_NAMES); pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT); pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE); pds.add(FETCH_SIZE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
propDescriptors = Collections.unmodifiableList(pds); propDescriptors = Collections.unmodifiableList(pds);
} }
@ -116,6 +144,18 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
return propDescriptors; return propDescriptors;
} }
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
@OnScheduled @OnScheduled
public void setup(final ProcessContext context) { public void setup(final ProcessContext context) {
super.setup(context); super.setup(context);
@ -124,7 +164,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession(); ProcessSession session = sessionFactory.createSession();
FlowFile fileToProcess = null; final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
final ComponentLog logger = getLogger(); final ComponentLog logger = getLogger();
@ -134,6 +174,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger(); final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger();
final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
final StateManager stateManager = context.getStateManager(); final StateManager stateManager = context.getStateManager();
final StateMap stateMap; final StateMap stateMap;
@ -150,11 +193,19 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
// set as the current state map (after the session has been committed) // set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap()); final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
//If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
for(final Map.Entry<String,String> maxProp : maxValueProperties.entrySet()){
if(!statePropertyMap.containsKey(maxProp.getKey())){
statePropertyMap.put(maxProp.getKey(), maxProp.getValue());
}
}
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames) List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? null ? null
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*")); : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, stateMap); final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, statePropertyMap);
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
final String fragmentIdentifier = UUID.randomUUID().toString();
try (final Connection con = dbcpService.getConnection(); try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) { final Statement st = con.createStatement()) {
@ -168,31 +219,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
} }
} }
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
final AtomicLong nrOfRows = new AtomicLong(0L);
fileToProcess = session.create();
fileToProcess = session.write(fileToProcess, out -> {
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
final ResultSet resultSet = st.executeQuery(selectQuery);
// Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter);
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector));
} catch (final SQLException e) {
throw new ProcessException("Error during database query or conversion of records to Avro", e);
}
});
if (nrOfRows.get() > 0) {
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
String jdbcURL = "DBCPService"; String jdbcURL = "DBCPService";
try { try {
DatabaseMetaData databaseMetaData = con.getMetaData(); DatabaseMetaData databaseMetaData = con.getMetaData();
@ -202,18 +228,69 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
} catch (SQLException se) { } catch (SQLException se) {
// Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
} }
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
final ResultSet resultSet = st.executeQuery(selectQuery);
int fragmentIndex=0;
while(true) {
final AtomicLong nrOfRows = new AtomicLong(0L);
FlowFile fileToProcess = session.create();
fileToProcess = session.write(fileToProcess, out -> {
// Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter);
try {
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile));
} catch (SQLException e) {
throw new ProcessException("Error during database query or conversion of records to Avro.", e);
}
});
if (nrOfRows.get() > 0) {
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
if(maxRowsPerFlowFile > 0) {
fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier);
fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex));
}
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(fileToProcess, REL_SUCCESS);
resultSetFlowFiles.add(fileToProcess);
} else { } else {
// If there were no rows returned, don't send the flowfile // If there were no rows returned, don't send the flowfile
session.remove(fileToProcess); session.remove(fileToProcess);
context.yield(); context.yield();
break;
} }
fragmentIndex++;
}
//set count on all FlowFiles
if(maxRowsPerFlowFile > 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
resultSetFlowFiles.set(i,
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
}
}
} catch (final SQLException e) {
throw e;
}
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} catch (final ProcessException | SQLException e) { } catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
if (fileToProcess != null) { if (!resultSetFlowFiles.isEmpty()) {
session.remove(fileToProcess); session.remove(resultSetFlowFiles);
} }
context.yield(); context.yield();
} finally { } finally {
@ -228,18 +305,17 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
} }
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames, protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
StateMap stateMap) { Map<String, String> stateMap) {
if (StringUtils.isEmpty(tableName)) { if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name must be specified"); throw new IllegalArgumentException("Table name must be specified");
} }
final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null)); final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
// Check state map for last max values // Check state map for last max values
if (stateMap != null && stateMap.getVersion() != -1 && maxValColumnNames != null) { if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
Map<String, String> stateProperties = stateMap.toMap();
List<String> whereClauses = new ArrayList<>(maxValColumnNames.size()); List<String> whereClauses = new ArrayList<>(maxValColumnNames.size());
for (String colName : maxValColumnNames) { for (String colName : maxValColumnNames) {
String maxValue = stateProperties.get(colName.toLowerCase()); String maxValue = stateMap.get(colName.toLowerCase());
if (!StringUtils.isEmpty(maxValue)) { if (!StringUtils.isEmpty(maxValue)) {
Integer type = columnTypeMap.get(colName.toLowerCase()); Integer type = columnTypeMap.get(colName.toLowerCase());
if (type == null) { if (type == null) {
@ -260,6 +336,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
} }
protected Map<String,String> getDefaultMaxValueProperties(final Map<PropertyDescriptor, String> properties){
final Map<String,String> defaultMaxValues = new HashMap<String, String>();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final String key = entry.getKey().getName();
if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) {
continue;
}
defaultMaxValues.put(key.substring(INTIIAL_MAX_VALUE_PROP_START.length()), entry.getValue());
}
return defaultMaxValues;
}
protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback { protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
DatabaseAdapter dbAdapter; DatabaseAdapter dbAdapter;
Map<String, String> newColMap; Map<String, String> newColMap;

View File

@ -79,7 +79,11 @@ public class JdbcCommon {
return convertToAvroStream(rs, outStream, recordName, null); return convertToAvroStream(rs, outStream, recordName, null);
} }
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) throws IOException, SQLException {
return convertToAvroStream(rs, outStream, recordName, callback, 0);
}
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, final int maxRows)
throws SQLException, IOException { throws SQLException, IOException {
final Schema schema = createSchema(rs, recordName); final Schema schema = createSchema(rs, recordName);
final GenericRecord rec = new GenericData.Record(schema); final GenericRecord rec = new GenericData.Record(schema);
@ -155,6 +159,9 @@ public class JdbcCommon {
} }
dataFileWriter.append(rec); dataFileWriter.append(rec);
nrOfRows += 1; nrOfRows += 1;
if(maxRows > 0 && nrOfRows == maxRows)
break;
} }
return nrOfRows; return nrOfRows;

View File

@ -30,6 +30,7 @@ import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.util.file.FileUtils;
@ -43,6 +44,7 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
@ -50,9 +52,11 @@ import java.sql.SQLNonTransientConnectionException;
import java.sql.Statement; import java.sql.Statement;
import java.sql.Types; import java.sql.Types;
import java.util.Arrays; import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.TimeZone;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -133,18 +137,18 @@ public class QueryDatabaseTableTest {
StateManager stateManager = runner.getStateManager(); StateManager stateManager = runner.getStateManager();
stateManager.setState(maxValues, Scope.CLUSTER); stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("id", Types.INTEGER); processor.putColumnType("id", Types.INTEGER);
query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER)); query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509", query); assertEquals("SELECT * FROM myTable WHERE id > 509", query);
maxValues.put("date_created", "2016-03-07 12:34:56"); maxValues.put("date_created", "2016-03-07 12:34:56");
stateManager.setState(maxValues, Scope.CLUSTER); stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("date_created", Types.TIMESTAMP); processor.putColumnType("date_created", Types.TIMESTAMP);
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER)); query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query);
// Test Oracle strategy // Test Oracle strategy
dbAdapter = new OracleDatabaseAdapter(); dbAdapter = new OracleDatabaseAdapter();
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER)); query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query);
} }
@ -377,6 +381,176 @@ public class QueryDatabaseTableTest {
assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty()); assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
} }
@Test
public void testMaxRowsPerFlowFile() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
InputStream in;
MockFlowFile mff;
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
int rowCount=0;
//create larger row set
for(int batch=0;batch<100;batch++){
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
rowCount++;
}
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "9");//Using a non-round number to make sure the last file is ragged
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 12);
//ensure all but the last file have 9 records each
for(int ff=0;ff<11;ff++) {
mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index"));
assertEquals("12", mff.getAttribute("fragment.count"));
}
//last file should have 1 record
mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(11), mff.getAttribute("fragment.index"));
assertEquals("12", mff.getAttribute("fragment.count"));
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
// Run again, this time should be a single partial flow file
for(int batch=0;batch<5;batch++){
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
rowCount++;
}
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
in = new ByteArrayInputStream(mff.toByteArray());
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(0), mff.getAttribute("fragment.index"));
assertEquals("1", mff.getAttribute("fragment.count"));
assertEquals(5, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Run again, this time should be a full batch and a partial
for(int batch=0;batch<14;batch++){
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
rowCount++;
}
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1).toByteArray());
assertEquals(5, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Run again with a cleaned state. Should get all rows split into batches
int ffCount = (int) Math.ceil((double)rowCount / 9D);
runner.getStateManager().clear(Scope.CLUSTER);
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, ffCount);
//ensure all but the last file have 9 records each
for(int ff=0;ff<ffCount-1;ff++) {
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff).toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
}
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ffCount-1).toByteArray());
assertEquals(rowCount%9, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
}
@Test
public void testInitialMaxValue() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
InputStream in;
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
cal.setTimeInMillis(0);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
int rowCount=0;
//create larger row set
for(int batch=0;batch<10;batch++){
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
rowCount++;
cal.add(Calendar.MINUTE, 1);
}
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on");
cal.setTimeInMillis(0);
cal.add(Calendar.MINUTE, 5);
runner.setProperty("initial.maxvalue.created_on", dateFormat.format(cal.getTime().getTime()));
// Initial run with no previous state. Should get only last 4 records
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(4, getNumberOfRecordsFromStream(in));
runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
// Validate Max Value doesn't change also
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
runner.clearTransferState();
// Append a new row, expect 1 flowfile one row
cal.setTimeInMillis(0);
cal.add(Calendar.MINUTE, rowCount);
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
rowCount++;
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:10:00.0", Scope.CLUSTER);
runner.clearTransferState();
}
private long getNumberOfRecordsFromStream(InputStream in) throws IOException { private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) { try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {