NIFI-1706: Extend QueryDatabaseTable to support arbitrary queries

- Only include Maximum Value columns in the type map.
- Squashed commits in the previous PR
- Rebased against the latest master
- Added stop method to GenerateTableFetch so that it refreshes the
column type map when it gets restarted
- Fixed whitespacing around if/for statement
- Updated expressionLanguageSupported value since it is not auto-merged
correctly

This closes #2618.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
patricker 2017-09-19 13:50:06 +08:00 committed by Koji Kawamura
parent b7272e3f32
commit 82ac815536
4 changed files with 264 additions and 7 deletions

View File

@ -44,6 +44,7 @@ import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
@ -152,12 +153,22 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
public static final PropertyDescriptor WHERE_CLAUSE = new PropertyDescriptor.Builder()
.name("db-fetch-where-clause")
.displayName("Additional WHERE clause")
.description("A custom clause to be added in the WHERE condition when generating SQL requests.")
.description("A custom clause to be added in the WHERE condition when building SQL queries.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SQL_QUERY = new PropertyDescriptor.Builder()
.name("db-fetch-sql-query")
.displayName("Custom Query")
.description("A custom SQL query used to retrieve data. Instead of building a SQL query from "
+ "other properties, this query will be wrapped as a sub-query. Query must have no ORDER BY statement.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected List<PropertyDescriptor> propDescriptors;
// The delimiter to use when referencing qualified names (such as table@!@column in the state map)
@ -246,6 +257,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
// Try to fill the columnTypeMap with the types of the desired max-value columns
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
try (final Connection con = dbcpService.getConnection();
@ -254,7 +266,17 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
// Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
// to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
// approach as in Apache Drill
String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null);
String query;
if (StringUtils.isEmpty(sqlQuery)) {
query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null);
} else {
StringBuilder sbQuery = getWrappedQuery(sqlQuery, tableName);
sbQuery.append(" WHERE 1=0");
query = sbQuery.toString();
}
ResultSet resultSet = st.executeQuery(query);
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int numCols = resultSetMetaData.getColumnCount();
@ -262,12 +284,34 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
if (shouldCleanCache) {
columnTypeMap.clear();
}
final List<String> maxValueColumnNameList = Arrays.asList(maxValueColumnNames.toLowerCase().split(","));
final List<String> maxValueQualifiedColumnNameList = new ArrayList<>();
for (String maxValueColumn:maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim());
maxValueQualifiedColumnNameList.add(colKey);
}
for (int i = 1; i <= numCols; i++) {
String colName = resultSetMetaData.getColumnName(i).toLowerCase();
String colKey = getStateKey(tableName, colName);
//only include columns that are part of the maximum value tracking column list
if (!maxValueQualifiedColumnNameList.contains(colKey)) {
continue;
}
int colType = resultSetMetaData.getColumnType(i);
columnTypeMap.putIfAbsent(colKey, colType);
}
for (String maxValueColumn:maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase());
if (!columnTypeMap.containsKey(colKey)) {
throw new ProcessException("Column not found in the table/query specified: " + maxValueColumn);
}
}
} else {
throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames);
}
@ -279,6 +323,10 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
}
}
protected static StringBuilder getWrappedQuery(String sqlQuery, String tableName){
return new StringBuilder("SELECT * FROM (" + sqlQuery + ") AS " + tableName);
}
protected static String getMaxValueFromRow(ResultSet resultSet,
int columnIndex,
Integer type,

View File

@ -28,6 +28,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -162,6 +163,12 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
}
@OnStopped
public void stop() {
// Reset the column type map in case properties change
setupComplete.set(false);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
// Fetch the column/table info once (if the table name and max value columns are not dynamic). Otherwise do the setup later

View File

@ -79,7 +79,8 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"sql", "select", "jdbc", "query", "database"})
@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class})
@CapabilityDescription("Generates and executes a SQL select query to fetch all rows whose values in the specified Maximum Value column(s) are larger than the "
@CapabilityDescription("Generates a SQL select query, or uses a provided statement, and executes it to fetch all rows whose values in the specified "
+ "Maximum Value column(s) are larger than the "
+ "previously-seen maxima. Query result will be converted to Avro format. Expression Language is supported for several properties, but no incoming "
+ "connections are permitted. The Variable Registry may be used to provide values for any property containing Expression Language. If it is desired to "
+ "leverage flow file attributes to perform these queries, the GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. "
@ -168,8 +169,13 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final List<PropertyDescriptor> pds = new ArrayList<>();
pds.add(DBCP_SERVICE);
pds.add(DB_TYPE);
pds.add(TABLE_NAME);
pds.add(new PropertyDescriptor.Builder()
.fromPropertyDescriptor(TABLE_NAME)
.description("The name of the database table to be queried. When a custom query is used, this property is used to alias the query and appears as an attribute on the FlowFile.")
.build());
pds.add(COLUMN_NAMES);
pds.add(WHERE_CLAUSE);
pds.add(SQL_QUERY);
pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
@ -180,7 +186,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
pds.add(USE_AVRO_LOGICAL_TYPES);
pds.add(DEFAULT_PRECISION);
pds.add(DEFAULT_SCALE);
pds.add(WHERE_CLAUSE);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -220,6 +226,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
@ -275,7 +282,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? null
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
final StopWatch stopWatch = new StopWatch(true);
final String fragmentIdentifier = UUID.randomUUID().toString();
@ -404,10 +411,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
String customWhereClause, Map<String, String> stateMap) {
return getQuery(dbAdapter, tableName, null, columnNames, maxValColumnNames, customWhereClause, stateMap);
}
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String sqlQuery, String columnNames, List<String> maxValColumnNames,
String customWhereClause, Map<String, String> stateMap) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name must be specified");
}
final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
final StringBuilder query;
if (StringUtils.isEmpty(sqlQuery)) {
query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
} else {
query = getWrappedQuery(sqlQuery, tableName);
}
List<String> whereClauses = new ArrayList<>();
// Check state map for last max values

View File

@ -986,6 +986,189 @@ public class QueryDatabaseTableTest {
runner.clearTransferState();
}
@Test
public void testCustomSQL() throws SQLException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
try {
stmt.execute("drop table TYPE_LIST");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
stmt.execute("create table TYPE_LIST (type_id integer not null, type varchar(20), descr varchar(255))");
stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (0, 'male', 'Man')");
stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (1, 'female', 'Woman')");
stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (2, '', 'Unspecified')");
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setProperty(QueryDatabaseTable.SQL_QUERY, "SELECT id, b.type as gender, b.descr, name, scale, created_on, bignum FROM TEST_QUERY_DB_TABLE a INNER JOIN TYPE_LIST b ON (a.type=b.type)");
runner.setProperty(QueryDatabaseTable.WHERE_CLAUSE, "gender = 'male'");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
assertEquals(flowFile.getAttribute("maxvalue.id"), "0");
InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
//Remove Max Rows Per Flow File
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0");
// Add a new row with a higher ID and run, one flowfile with one new row should be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
// Sanity check - run again, this time no flowfiles/rows should be transferred
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
// Add timestamp as a max value column name
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "id, created_on");
// Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234");
in = new ByteArrayInputStream(flowFile.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (5, 'male', 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
// Add a new row with a higher ID and run, one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
runner.getStateManager().clear(Scope.CLUSTER);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "name");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(4, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
runner.getStateManager().clear(Scope.CLUSTER);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "scale");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(5, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Add a new row with a higher value for scale than the max, one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
runner.getStateManager().clear(Scope.CLUSTER);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "bignum");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(6, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Add a new row with a higher value for scale than the max, one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on, bignum) VALUES (9, 'female', 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
}
@Test(expected = AssertionError.class)
public void testMissingColumn() throws ProcessException, ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
try {
stmt.execute("drop table TYPE_LIST");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
stmt.execute("create table TYPE_LIST (type_id integer not null, type varchar(20), descr varchar(255))");
stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (0, 'male', 'Man')");
stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (1, 'female', 'Woman')");
stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (2, '', 'Unspecified')");
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TYPE_LIST");
runner.setProperty(QueryDatabaseTable.SQL_QUERY, "SELECT b.type, b.descr, name, scale, created_on, bignum FROM TEST_QUERY_DB_TABLE a INNER JOIN TYPE_LIST b ON (a.type=b.type)");
runner.setProperty(QueryDatabaseTable.WHERE_CLAUSE, "type = 'male'");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "2");
runner.run();
}
private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {