diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 7b304795f0..962e74effd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -111,8 +111,10 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder() .name("Maximum-value Columns") .description("A comma-separated list of column names. The processor will keep track of the maximum value " - + "for each column that has been returned since the processor started running. This can be used to " - + "retrieve only those rows that have been added/updated since the last retrieval. Note that some " + + "for each column that has been returned since the processor started running. Using multiple columns implies an order " + + "to the column list, and each column's values are expected to increase more slowly than the previous columns' values. Thus, " + + "using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. This processor " + + "can be used to retrieve only those rows that have been added/updated since the last retrieval. Note that some " + "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these " + "types should not be listed in this property, and will result in error(s) during processing. If no columns " + "are provided, all rows from the table will be considered, which could have a performance impact.") diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 23a2e3c941..f1c86f8525 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -56,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; @TriggerSerially @@ -160,7 +161,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { maxValueSelectColumns.add("COUNT(*)"); // For each maximum-value column, get a WHERE filter and a MAX(column) alias - maxValueColumnNameList.forEach(colName -> { + IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> { + String colName = maxValueColumnNameList.get(index); maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = statePropertyMap.get(colName.toLowerCase()); if (!StringUtils.isEmpty(maxValue)) { @@ -170,7 +172,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { throw new IllegalArgumentException("No column type found for: " + colName); } // Add a condition for the WHERE clause - maxValueClauses.add(colName + " > " + getLiteralByType(type, maxValue, dbAdapter.getName())); + maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName())); } }); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index cfe68b5bfa..ed578543ba 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -64,6 +64,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; @EventDriven @@ -325,9 +326,10 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null)); // Check state map for last max values - if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) { + if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) { List whereClauses = new ArrayList<>(maxValColumnNames.size()); - for (String colName : maxValColumnNames) { + IntStream.range(0, maxValColumnNames.size()).forEach((index) -> { + String colName = maxValColumnNames.get(index); String maxValue = stateMap.get(colName.toLowerCase()); if (!StringUtils.isEmpty(maxValue)) { Integer type = columnTypeMap.get(colName.toLowerCase()); @@ -336,9 +338,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { throw new IllegalArgumentException("No column type found for: " + colName); } // Add a condition for the WHERE clause - whereClauses.add(colName + " > " + getLiteralByType(type, maxValue, dbAdapter.getName())); + whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName())); } - } + }); if (!whereClauses.isEmpty()) { query.append(" WHERE "); query.append(StringUtils.join(whereClauses, " AND ")); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 979dd387cc..974a835808 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -121,7 +121,8 @@ public class QueryDatabaseTableTest { } @After - public void teardown() { + public void teardown() throws IOException { + runner.getStateManager().clear(Scope.CLUSTER); runner = null; QueryDatabaseTable.dbAdapters.clear(); QueryDatabaseTable.dbAdapters.putAll(origDbAdapters); @@ -149,12 +150,12 @@ public class QueryDatabaseTableTest { stateManager.setState(maxValues, Scope.CLUSTER); processor.putColumnType("date_created", Types.TIMESTAMP); query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap()); - assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query); + assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query); // Test Oracle strategy dbAdapter = new OracleDatabaseAdapter(); query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap()); - assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query); + assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query); } @Test(expected = IllegalArgumentException.class) @@ -294,6 +295,67 @@ public class QueryDatabaseTableTest { runner.clearTransferState(); } + @Test + public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID, BUCKET"); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + assertEquals("2", + runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT) + ); + runner.clearTransferState(); + + // Add a new row in the same bucket + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + assertEquals("1", + runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT) + ); + runner.clearTransferState(); + + // Add a new row in a new bucket + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (3, 1)"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + assertEquals("1", + runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT) + ); + runner.clearTransferState(); + + // Add a new row in an old bucket, it should not be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (4, 0)"); + runner.run(); + runner.assertTransferCount(QueryDatabaseTable.REL_SUCCESS, 0); + + // Add a new row in the second bucket, only the new row should be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (5, 1)"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + assertEquals("1", + runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT) + ); + runner.clearTransferState(); + } + @Test public void testTimestampNanos() throws ClassNotFoundException, SQLException, InitializationException, IOException { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 2f2fbd102e..3b9f3a354e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -200,6 +200,57 @@ public class TestGenerateTableFetch { runner.clearTransferState(); } + @Test + public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID, BUCKET"); + // Set partition size to 1 so we can compare flow files to records + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "1"); + + runner.run(); + runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 2); + runner.clearTransferState(); + + // Add a new row in the same bucket + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)"); + runner.run(); + runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1); + runner.clearTransferState(); + + // Add a new row in a new bucket + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (3, 1)"); + runner.run(); + runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1); + runner.clearTransferState(); + + // Add a new row in an old bucket, it should not be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (4, 0)"); + runner.run(); + runner.assertTransferCount(GenerateTableFetch.REL_SUCCESS, 0); + + // Add a new row in the second bucket, only the new row should be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (5, 1)"); + runner.run(); + runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1); + runner.clearTransferState(); + } + /** * Simple implementation only for ListDatabaseTables processor testing.