NIFI-4257 - add custom WHERE clause in database fetch processors

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2050
This commit is contained in:
Pierre Villard 2017-08-02 15:22:31 +02:00 committed by Matthew Burgess
parent ae30c7f350
commit c10ff574c4
5 changed files with 288 additions and 17 deletions

View File

@ -146,6 +146,15 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor WHERE_CLAUSE = new PropertyDescriptor.Builder()
.name("db-fetch-where-clause")
.displayName("Additional WHERE clause")
.description("A custom clause to be added in the WHERE condition when generating SQL requests.")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected List<PropertyDescriptor> propDescriptors; protected List<PropertyDescriptor> propDescriptors;
// The delimiter to use when referencing qualified names (such as table@!@column in the state map) // The delimiter to use when referencing qualified names (such as table@!@column in the state map)

View File

@ -132,6 +132,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
pds.add(MAX_VALUE_COLUMN_NAMES); pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT); pds.add(QUERY_TIMEOUT);
pds.add(PARTITION_SIZE); pds.add(PARTITION_SIZE);
pds.add(WHERE_CLAUSE);
propDescriptors = Collections.unmodifiableList(pds); propDescriptors = Collections.unmodifiableList(pds);
} }
@ -185,6 +186,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue(); final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue(); final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger(); final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
final StateManager stateManager = context.getStateManager(); final StateManager stateManager = context.getStateManager();
final StateMap stateMap; final StateMap stateMap;
@ -248,6 +250,11 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
} }
}); });
if(customWhereClause != null) {
// adding the custom WHERE clause (if defined) to the list of existing clauses.
maxValueClauses.add("(" + customWhereClause + ")");
}
whereClause = StringUtils.join(maxValueClauses, " AND "); whereClause = StringUtils.join(maxValueClauses, " AND ");
columnsClause = StringUtils.join(maxValueSelectColumns, ", "); columnsClause = StringUtils.join(maxValueSelectColumns, ", ");

View File

@ -161,6 +161,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
pds.add(USE_AVRO_LOGICAL_TYPES); pds.add(USE_AVRO_LOGICAL_TYPES);
pds.add(DEFAULT_PRECISION); pds.add(DEFAULT_PRECISION);
pds.add(DEFAULT_SCALE); pds.add(DEFAULT_SCALE);
pds.add(WHERE_CLAUSE);
propDescriptors = Collections.unmodifiableList(pds); propDescriptors = Collections.unmodifiableList(pds);
} }
@ -192,6 +193,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue(); final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger(); final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet() final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
@ -243,7 +245,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames) List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? null ? null
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*")); : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, statePropertyMap); final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
final String fragmentIdentifier = UUID.randomUUID().toString(); final String fragmentIdentifier = UUID.randomUUID().toString();
@ -363,15 +365,15 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
} }
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames, protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
Map<String, String> stateMap) { String customWhereClause, Map<String, String> stateMap) {
if (StringUtils.isEmpty(tableName)) { if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name must be specified"); throw new IllegalArgumentException("Table name must be specified");
} }
final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null)); final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
List<String> whereClauses = new ArrayList<>();
// Check state map for last max values // Check state map for last max values
if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) { if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
List<String> whereClauses = new ArrayList<>(maxValColumnNames.size());
IntStream.range(0, maxValColumnNames.size()).forEach((index) -> { IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
String colName = maxValColumnNames.get(index); String colName = maxValColumnNames.get(index);
String maxValueKey = getStateKey(tableName, colName); String maxValueKey = getStateKey(tableName, colName);
@ -392,10 +394,15 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName())); whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
} }
}); });
if (!whereClauses.isEmpty()) { }
query.append(" WHERE ");
query.append(StringUtils.join(whereClauses, " AND ")); if (customWhereClause != null) {
} whereClauses.add("(" + customWhereClause + ")");
}
if (!whereClauses.isEmpty()) {
query.append(" WHERE ");
query.append(StringUtils.join(whereClauses, " AND "));
} }
return query.toString(); return query.toString();

View File

@ -45,13 +45,13 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException; import java.sql.SQLNonTransientConnectionException;
import java.sql.Statement; import java.sql.Statement;
import java.sql.Types; import java.sql.Types;
import java.text.SimpleDateFormat;
import java.util.Arrays; import java.util.Arrays;
import java.util.Calendar; import java.util.Calendar;
import java.util.Collections; import java.util.Collections;
@ -132,12 +132,12 @@ public class QueryDatabaseTableTest {
@Test @Test
public void testGetQuery() throws Exception { public void testGetQuery() throws Exception {
String query = processor.getQuery(dbAdapter, "myTable", null, null, null); String query = processor.getQuery(dbAdapter, "myTable", null, null, null, null);
assertEquals("SELECT * FROM myTable", query); assertEquals("SELECT * FROM myTable", query);
query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null); query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null, null);
assertEquals("SELECT col1,col2 FROM myTable", query); assertEquals("SELECT col1,col2 FROM myTable", query);
query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null); query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, null);
assertEquals("SELECT * FROM myTable", query); assertEquals("SELECT * FROM myTable", query);
Map<String, String> maxValues = new HashMap<>(); Map<String, String> maxValues = new HashMap<>();
@ -145,24 +145,24 @@ public class QueryDatabaseTableTest {
StateManager stateManager = runner.getStateManager(); StateManager stateManager = runner.getStateManager();
stateManager.setState(maxValues, Scope.CLUSTER); stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER); processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER).toMap()); query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509", query); assertEquals("SELECT * FROM myTable WHERE id > 509", query);
maxValues.put("date_created", "2016-03-07 12:34:56"); maxValues.put("date_created", "2016-03-07 12:34:56");
stateManager.setState(maxValues, Scope.CLUSTER); stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP); processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP);
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap()); query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), null, stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query);
// Test Oracle strategy // Test Oracle strategy
dbAdapter = new OracleDatabaseAdapter(); dbAdapter = new OracleDatabaseAdapter();
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap()); query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56'", query); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND (type = \"CUSTOMER\")", query);
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testGetQueryNoTable() throws Exception { public void testGetQueryNoTable() throws Exception {
processor.getQuery(dbAdapter, null, null, null, null); processor.getQuery(dbAdapter, null, null, null, null, null);
} }
@Test @Test
@ -587,7 +587,7 @@ public class QueryDatabaseTableTest {
runner.clearTransferState(); runner.clearTransferState();
// Run again with a cleaned state. Should get all rows split into batches // Run again with a cleaned state. Should get all rows split into batches
int ffCount = (int) Math.ceil((double)rowCount / 9D); int ffCount = (int) Math.ceil(rowCount / 9D);
runner.getStateManager().clear(Scope.CLUSTER); runner.getStateManager().clear(Scope.CLUSTER);
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, ffCount); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, ffCount);
@ -717,6 +717,139 @@ public class QueryDatabaseTableTest {
runner.clearTransferState(); runner.clearTransferState();
} }
@Test
public void testAddedRowsCustomWhereClause() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setProperty(QueryDatabaseTable.WHERE_CLAUSE, "type = 'male'");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
assertEquals(flowFile.getAttribute("maxvalue.id"), "0");
InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
//Remove Max Rows Per Flow File
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0");
// Add a new row with a higher ID and run, one flowfile with one new row should be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
// Sanity check - run again, this time no flowfiles/rows should be transferred
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
// Add timestamp as a max value column name
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "id, created_on");
// Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234");
in = new ByteArrayInputStream(flowFile.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (5, 'male', 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
// Add a new row with a higher ID and run, one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
runner.getStateManager().clear(Scope.CLUSTER);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "name");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(4, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
runner.getStateManager().clear(Scope.CLUSTER);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "scale");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(5, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Add a new row with a higher value for scale than the max, one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
runner.getStateManager().clear(Scope.CLUSTER);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "bignum");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(6, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Add a new row with a higher value for scale than the max, one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on, bignum) VALUES (9, 'female', 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
}
private long getNumberOfRecordsFromStream(InputStream in) throws IOException { private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) { try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {

View File

@ -924,6 +924,121 @@ public class TestGenerateTableFetch {
runner.clearTransferState(); runner.clearTransferState();
} }
@Test
public void testAddedRowsWithCustomWhereClause() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setProperty(GenerateTableFetch.WHERE_CLAUSE, "type = 'male' OR type IS NULL");
runner.setIncomingConnection(false);
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ " AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
ResultSet resultSet = stmt.executeQuery(query);
// Should be two records
assertTrue(resultSet.next());
assertTrue(resultSet.next());
assertFalse(resultSet.next());
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
runner.clearTransferState();
// Add 3 new rows with a higher ID and run with a partition size of 2. Two flow files should be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (5, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "1");
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
// Verify first flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND (type = 'male' OR type IS NULL)"
+ " AND ID <= 5 ORDER BY ID FETCH NEXT 1 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
assertFalse(resultSet.next());
// Verify second flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND (type = 'male' OR type IS NULL)"
+ " AND ID <= 5 ORDER BY ID OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
assertFalse(resultSet.next());
runner.clearTransferState();
// Add a new row with a higher ID and run, one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 5 AND (type = 'male' OR type IS NULL)"
+ " AND ID <= 6 ORDER BY ID FETCH NEXT 1 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
assertFalse(resultSet.next());
runner.clearTransferState();
// Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
runner.getStateManager().clear(Scope.CLUSTER);
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "name");
runner.setProperty(GenerateTableFetch.COLUMN_NAMES, "id, type, name, scale, created_on");
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 5); // 5 records with partition size 1 means 5 generated FlowFiles
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ " AND name <= 'Mr. NiFi' ORDER BY name FETCH NEXT 1 ROWS ONLY", new String(flowFile.toByteArray()));
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY", new String(flowFile.toByteArray()));
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2);
assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 2 ROWS FETCH NEXT 1 ROWS ONLY", new String(flowFile.toByteArray()));
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(3);
assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 3 ROWS FETCH NEXT 1 ROWS ONLY", new String(flowFile.toByteArray()));
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(4);
assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 4 ROWS FETCH NEXT 1 ROWS ONLY", new String(flowFile.toByteArray()));
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
assertEquals("id, type, name, scale, created_on", flowFile.getAttribute("generatetablefetch.columnNames"));
assertEquals("(type = 'male' OR type IS NULL) AND name <= 'Mr. NiFi'", flowFile.getAttribute("generatetablefetch.whereClause"));
assertEquals("name", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
assertEquals("1", flowFile.getAttribute("generatetablefetch.limit"));
assertEquals("4", flowFile.getAttribute("generatetablefetch.offset"));
runner.clearTransferState();
}
/** /**
* Simple implementation only for GenerateTableFetch processor testing. * Simple implementation only for GenerateTableFetch processor testing.