NIFI-3596 - added attributes to GenerateTableFetch processor

Signed-off-by: Matt Burgess <mattyb149@apache.org>

Updated test to check selected column names

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1596
This commit is contained in:
Pierre Villard 2017-03-14 23:19:05 +01:00 committed by Matt Burgess
parent a26689318d
commit ced6708d4b
2 changed files with 48 additions and 6 deletions

View File

@ -85,7 +85,14 @@ import java.util.stream.IntStream;
+ "per the State Management documentation")
@WritesAttributes({
@WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes "
+ "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message.")
+ "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."),
@WritesAttribute(attribute = "generatetablefetch.tableName", description = "The name of the database table to be queried."),
@WritesAttribute(attribute = "generatetablefetch.columnNames", description = "The comma-separated list of column names used in the query."),
@WritesAttribute(attribute = "generatetablefetch.whereClause", description = "Where clause used in the query to get the expected rows."),
@WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data "
+ "that has been returned since the processor started running."),
@WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
@WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
})
public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
@ -140,6 +147,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
return super.customValidate(validationContext);
}
@Override
@OnScheduled
public void setup(final ProcessContext context) {
// Pre-fetch the column types if using a static table name and max-value columns
@ -291,9 +299,24 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
for (long i = 0; i < numberOfFetches; i++) {
long limit = partitionSize == 0 ? null : partitionSize;
long offset = partitionSize == 0 ? null : i * partitionSize;
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset);
final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset);
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName);
if (columnNames != null) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.columnNames", columnNames);
}
if (StringUtils.isNotBlank(whereClause)) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.whereClause", whereClause);
}
if (StringUtils.isNotBlank(maxColumnNames)) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.maxColumnNames", maxColumnNames);
}
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.limit", String.valueOf(limit));
if (partitionSize != 0) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.offset", String.valueOf(offset));
}
session.transfer(sqlFlowFile, REL_SUCCESS);
}

View File

@ -197,16 +197,23 @@ public class TestGenerateTableFetch {
// Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
runner.getStateManager().clear(Scope.CLUSTER);
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "name");
runner.setProperty(GenerateTableFetch.COLUMN_NAMES, "id, name, scale, created_on");
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 4); // 7 records with partition size 2 means 4 generated FlowFiles
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY name FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2);
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(3);
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 6 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 6 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
assertEquals("id, name, scale, created_on", flowFile.getAttribute("generatetablefetch.columnNames"));
assertEquals(null, flowFile.getAttribute("generatetablefetch.whereClause"));
assertEquals("name", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
assertEquals("2", flowFile.getAttribute("generatetablefetch.limit"));
assertEquals("6", flowFile.getAttribute("generatetablefetch.offset"));
runner.clearTransferState();
}
@ -415,6 +422,12 @@ public class TestGenerateTableFetch {
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
// Note there is no WHERE clause here. Because we are using dynamic tables, the old state key/value is not retrieved
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
assertEquals(null, flowFile.getAttribute("generatetablefetch.columnNames"));
assertEquals(null, flowFile.getAttribute("generatetablefetch.whereClause"));
assertEquals("id", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
assertEquals("10000", flowFile.getAttribute("generatetablefetch.limit"));
assertEquals("0", flowFile.getAttribute("generatetablefetch.offset"));
runner.clearTransferState();
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
@ -428,6 +441,12 @@ public class TestGenerateTableFetch {
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
assertEquals(null, flowFile.getAttribute("generatetablefetch.columnNames"));
assertEquals("id > 1", flowFile.getAttribute("generatetablefetch.whereClause"));
assertEquals("id", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
assertEquals("10000", flowFile.getAttribute("generatetablefetch.limit"));
assertEquals("0", flowFile.getAttribute("generatetablefetch.offset"));
}
@Test