NIFI-2712: Fixed Fetch processors for multiple max-value columns. This closes #976

This commit is contained in:
Matt Burgess 2016-08-31 09:00:16 -04:00 committed by Matt Gilman
parent a7e76cc00a
commit 2afc739ab7
5 changed files with 130 additions and 11 deletions

View File

@ -111,8 +111,10 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder()
.name("Maximum-value Columns")
.description("A comma-separated list of column names. The processor will keep track of the maximum value "
+ "for each column that has been returned since the processor started running. This can be used to "
+ "retrieve only those rows that have been added/updated since the last retrieval. Note that some "
+ "for each column that has been returned since the processor started running. Using multiple columns implies an order "
+ "to the column list, and each column's values are expected to increase more slowly than the previous columns' values. Thus, "
+ "using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. This processor "
+ "can be used to retrieve only those rows that have been added/updated since the last retrieval. Note that some "
+ "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these "
+ "types should not be listed in this property, and will result in error(s) during processing. If no columns "
+ "are provided, all rows from the table will be considered, which could have a performance impact.")

View File

@ -56,6 +56,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@TriggerSerially
@ -160,7 +161,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
maxValueSelectColumns.add("COUNT(*)");
// For each maximum-value column, get a WHERE filter and a MAX(column) alias
maxValueColumnNameList.forEach(colName -> {
IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = statePropertyMap.get(colName.toLowerCase());
if (!StringUtils.isEmpty(maxValue)) {
@ -170,7 +172,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
throw new IllegalArgumentException("No column type found for: " + colName);
}
// Add a condition for the WHERE clause
maxValueClauses.add(colName + " > " + getLiteralByType(type, maxValue, dbAdapter.getName()));
maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
});

View File

@ -64,6 +64,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
@EventDriven
@ -327,7 +328,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
// Check state map for last max values
if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
List<String> whereClauses = new ArrayList<>(maxValColumnNames.size());
for (String colName : maxValColumnNames) {
IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
String colName = maxValColumnNames.get(index);
String maxValue = stateMap.get(colName.toLowerCase());
if (!StringUtils.isEmpty(maxValue)) {
Integer type = columnTypeMap.get(colName.toLowerCase());
@ -336,9 +338,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
throw new IllegalArgumentException("No column type found for: " + colName);
}
// Add a condition for the WHERE clause
whereClauses.add(colName + " > " + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
});
if (!whereClauses.isEmpty()) {
query.append(" WHERE ");
query.append(StringUtils.join(whereClauses, " AND "));

View File

@ -121,7 +121,8 @@ public class QueryDatabaseTableTest {
}
@After
public void teardown() {
public void teardown() throws IOException {
runner.getStateManager().clear(Scope.CLUSTER);
runner = null;
QueryDatabaseTable.dbAdapters.clear();
QueryDatabaseTable.dbAdapters.putAll(origDbAdapters);
@ -149,12 +150,12 @@ public class QueryDatabaseTableTest {
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("date_created", Types.TIMESTAMP);
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query);
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query);
// Test Oracle strategy
dbAdapter = new OracleDatabaseAdapter();
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query);
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query);
}
@Test(expected = IllegalArgumentException.class)
@ -294,6 +295,67 @@ public class QueryDatabaseTableTest {
runner.clearTransferState();
}
@Test
public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID, BUCKET");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
assertEquals("2",
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT)
);
runner.clearTransferState();
// Add a new row in the same bucket
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
assertEquals("1",
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT)
);
runner.clearTransferState();
// Add a new row in a new bucket
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (3, 1)");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
assertEquals("1",
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT)
);
runner.clearTransferState();
// Add a new row in an old bucket, it should not be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (4, 0)");
runner.run();
runner.assertTransferCount(QueryDatabaseTable.REL_SUCCESS, 0);
// Add a new row in the second bucket, only the new row should be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (5, 1)");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
assertEquals("1",
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT)
);
runner.clearTransferState();
}
@Test
public void testTimestampNanos() throws ClassNotFoundException, SQLException, InitializationException, IOException {

View File

@ -200,6 +200,57 @@ public class TestGenerateTableFetch {
runner.clearTransferState();
}
@Test
public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID, BUCKET");
// Set partition size to 1 so we can compare flow files to records
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "1");
runner.run();
runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 2);
runner.clearTransferState();
// Add a new row in the same bucket
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
runner.run();
runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1);
runner.clearTransferState();
// Add a new row in a new bucket
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (3, 1)");
runner.run();
runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1);
runner.clearTransferState();
// Add a new row in an old bucket, it should not be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (4, 0)");
runner.run();
runner.assertTransferCount(GenerateTableFetch.REL_SUCCESS, 0);
// Add a new row in the second bucket, only the new row should be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (5, 1)");
runner.run();
runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1);
runner.clearTransferState();
}
/**
* Simple implementation only for ListDatabaseTables processor testing.