NIFI-6348: Added Custom ORDER BY Column property to GenerateTableFetch

NIFI-6348: Fixed doc

This closes #3515.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matthew Burgess 2019-06-04 15:50:41 -04:00 committed by Bryan Bende
parent 60b5c13ce9
commit 1d18735076
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 64 additions and 1 deletions

View File

@ -154,6 +154,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final PropertyDescriptor CUSTOM_ORDERBY_COLUMN = new PropertyDescriptor.Builder()
.name("gen-table-custom-orderby-column")
.displayName("Custom ORDER BY Column")
.description("The name of a column to be used for ordering the results if Max-Value Columns are not provided and partitioning is enabled. This property is ignored if either "
+ "Max-Value Columns is set or Partition Size = 0. NOTE: If neither Max-Value Columns nor Custom ORDER BY Column is set, then depending on the "
+ "the database/driver, the processor may report an error and/or the generated SQL may result in missing and/or duplicate rows. This is because without an explicit "
+ "ordering, fetching each partition is done using an arbitrary ordering.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. "
@ -176,6 +188,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
pds.add(PARTITION_SIZE);
pds.add(COLUMN_FOR_VALUE_PARTITIONING);
pds.add(WHERE_CLAUSE);
pds.add(CUSTOM_ORDERBY_COLUMN);
pds.add(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -260,6 +273,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
final String columnForPartitioning = context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean useColumnValsForPaging = !StringUtils.isEmpty(columnForPartitioning);
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
final String customOrderByColumn = context.getProperty(CUSTOM_ORDERBY_COLUMN).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean outputEmptyFlowFileOnZeroResults = context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean();
final StateManager stateManager = context.getStateManager();
@ -497,7 +511,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND ");
Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0);
// Don't use an ORDER BY clause if there's only one partition
final String orderByClause = partitionSize == 0 ? null : maxColumnNames;
final String orderByClause = partitionSize == 0 ? null : (maxColumnNames.isEmpty() ? customOrderByColumn : maxColumnNames);
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, columnForPartitioning);
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);

View File

@ -1609,6 +1609,55 @@ public class TestGenerateTableFetch {
runner.clearTransferState();
}
@Test
public void testCustomOrderByColumn() throws SQLException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(GenerateTableFetch.CUSTOM_ORDERBY_COLUMN, "SCALE");
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 ORDER BY SCALE FETCH NEXT 2 ROWS ONLY", query);
flowFile.assertAttributeEquals(FRAGMENT_INDEX, "0");
flowFile.assertAttributeEquals(FRAGMENT_COUNT, "2");
ResultSet resultSet = stmt.executeQuery(query);
// Should be two records
assertTrue(resultSet.next());
assertTrue(resultSet.next());
assertFalse(resultSet.next());
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 ORDER BY SCALE OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
flowFile.assertAttributeEquals(FRAGMENT_INDEX, "1");
flowFile.assertAttributeEquals(FRAGMENT_COUNT, "2");
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
assertFalse(resultSet.next());
runner.clearTransferState();
}
/**
* Simple implementation only for GenerateTableFetch processor testing.
*/