NIFI-5604: Added property to allow empty FlowFile when no SQL generated by GenerateTableFetch

co-authored by: Peter Wicks <patricker@gmail.com>
Signed-off-by: Peter Wicks <patricker@gmail.com>

This closes #3075.
This commit is contained in:
Matthew Burgess 2018-11-01 22:09:13 -04:00 committed by Peter Wicks
parent 75906226a6
commit 0207d0813e
No known key found for this signature in database
GPG Key ID: 79ABE9BA9C7AB3CD
2 changed files with 119 additions and 34 deletions

View File

@ -142,6 +142,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS = new PropertyDescriptor.Builder()
.name("gen-table-output-flowfile-on-zero-results")
.displayName("Output Empty FlowFile on Zero Results")
.description("Depending on the specified properties, an execution of this processor may not result in any SQL statements generated. When this property "
+ "is true, an empty flow file will be generated (having the parent of the incoming flow file if present) and transferred to the 'success' relationship. "
+ "When this property is false, no output flow files will be generated.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. "
@ -164,6 +176,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
pds.add(PARTITION_SIZE);
pds.add(COLUMN_FOR_VALUE_PARTITIONING);
pds.add(WHERE_CLAUSE);
pds.add(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -247,6 +260,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
final String columnForPartitioning = context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean useColumnValsForPaging = !StringUtils.isEmpty(columnForPartitioning);
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean outputEmptyFlowFileOnZeroResults = context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean();
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
@ -435,49 +449,75 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
// Generate SQL statements to read "pages" of data
Long limit = partitionSize == 0 ? null : (long) partitionSize;
final String fragmentIdentifier = UUID.randomUUID().toString();
for (long i = 0; i < numberOfFetches; i++) {
// Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit)
if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) {
maxValueClauses.add(columnForPartitioning + " <= " + maxValueForPartitioning);
limit = null;
}
List<FlowFile> flowFilesToTransfer = new ArrayList<>();
Map<String, String> baseAttributes = new HashMap<>();
baseAttributes.put("generatetablefetch.tableName", tableName);
if (columnNames != null) {
baseAttributes.put("generatetablefetch.columnNames", columnNames);
}
final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
if (StringUtils.isNotBlank(maxColumnNames)) {
baseAttributes.put("generatetablefetch.maxColumnNames", maxColumnNames);
}
baseAttributes.put(FRAGMENT_ID, fragmentIdentifier);
baseAttributes.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches));
// If there are no SQL statements to be generated, still output an empty flow file if specified by the user
if (numberOfFetches == 0 && outputEmptyFlowFileOnZeroResults) {
FlowFile emptyFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
Map<String, String> attributesToAdd = new HashMap<>();
//Update WHERE list to include new right hand boundaries
whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND ");
attributesToAdd.put("generatetablefetch.whereClause", whereClause);
Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0);
final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning);
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
Map<String,String> attributesToAdd = new HashMap<>();
attributesToAdd.put("generatetablefetch.tableName", tableName);
if (columnNames != null) {
attributesToAdd.put("generatetablefetch.columnNames", columnNames);
}
if (StringUtils.isNotBlank(whereClause)) {
attributesToAdd.put("generatetablefetch.whereClause", whereClause);
}
if (StringUtils.isNotBlank(maxColumnNames)) {
attributesToAdd.put("generatetablefetch.maxColumnNames", maxColumnNames);
}
attributesToAdd.put("generatetablefetch.limit", String.valueOf(limit));
attributesToAdd.put("generatetablefetch.limit", null);
if (partitionSize != 0) {
attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset));
attributesToAdd.put("generatetablefetch.offset", null);
}
// Add fragment attributes
attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier);
attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i));
attributesToAdd.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches));
attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(0));
sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd);
session.transfer(sqlFlowFile, REL_SUCCESS);
attributesToAdd.putAll(baseAttributes);
emptyFlowFile = session.putAllAttributes(emptyFlowFile, attributesToAdd);
flowFilesToTransfer.add(emptyFlowFile);
} else {
Long limit = partitionSize == 0 ? null : (long) partitionSize;
for (long i = 0; i < numberOfFetches; i++) {
// Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit)
if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) {
maxValueClauses.add(columnForPartitioning + " <= " + maxValueForPartitioning);
limit = null;
}
//Update WHERE list to include new right hand boundaries
whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND ");
Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0);
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning);
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
Map<String,String> attributesToAdd = new HashMap<>();
attributesToAdd.put("generatetablefetch.whereClause", whereClause);
attributesToAdd.put("generatetablefetch.limit", (limit == null) ? null : limit.toString());
if (partitionSize != 0) {
attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset));
}
// Add fragment attributes
attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i));
attributesToAdd.putAll(baseAttributes);
sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd);
flowFilesToTransfer.add(sqlFlowFile);
}
}
session.transfer(flowFilesToTransfer, REL_SUCCESS);
if (fileToProcess != null) {
session.remove(fileToProcess);
}

View File

@ -54,6 +54,7 @@ import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.REL_SUCCESS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
@ -539,10 +540,54 @@ public class TestGenerateTableFetch {
runner.run();
runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0).assertContentEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID");
MockFlowFile flowFile = runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0);
flowFile.assertContentEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID");
flowFile.assertAttributeExists("generatetablefetch.limit");
flowFile.assertAttributeEquals("generatetablefetch.limit", null);
runner.clearTransferState();
}
@Test
public void testFlowFileGeneratedOnZeroResults() throws SQLException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(GenerateTableFetch.COLUMN_NAMES, "ID,BUCKET");
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
// Set partition size to 0 so we can see that the flow file gets all rows
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "1");
runner.setProperty(GenerateTableFetch.OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS, "false");
runner.run();
runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 0);
runner.clearTransferState();
runner.setProperty(GenerateTableFetch.OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS, "true");
runner.run();
runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0);
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
assertEquals("ID,BUCKET", flowFile.getAttribute("generatetablefetch.columnNames"));
assertEquals("1=1", flowFile.getAttribute("generatetablefetch.whereClause"));
assertEquals("ID", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
assertNull(flowFile.getAttribute("generatetablefetch.limit"));
assertNull(flowFile.getAttribute("generatetablefetch.offset"));
assertEquals("0", flowFile.getAttribute("fragment.index"));
assertEquals("0", flowFile.getAttribute("fragment.count"));
}
@Test
public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException {