diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index a5473938fa..b4ba9fee30 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -142,6 +142,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS = new PropertyDescriptor.Builder() + .name("gen-table-output-flowfile-on-zero-results") + .displayName("Output Empty FlowFile on Zero Results") + .description("Depending on the specified properties, an execution of this processor may not result in any SQL statements generated. When this property " + + "is true, an empty flow file will be generated (having the parent of the incoming flow file if present) and transferred to the 'success' relationship. " + + "When this property is false, no output flow files will be generated.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. " @@ -164,6 +176,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { pds.add(PARTITION_SIZE); pds.add(COLUMN_FOR_VALUE_PARTITIONING); pds.add(WHERE_CLAUSE); + pds.add(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS); propDescriptors = Collections.unmodifiableList(pds); } @@ -247,6 +260,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { final String columnForPartitioning = context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue(); final boolean useColumnValsForPaging = !StringUtils.isEmpty(columnForPartitioning); final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue(); + final boolean outputEmptyFlowFileOnZeroResults = context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean(); final StateManager stateManager = context.getStateManager(); final StateMap stateMap; @@ -435,49 +449,75 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { } // Generate SQL statements to read "pages" of data - Long limit = partitionSize == 0 ? null : (long) partitionSize; final String fragmentIdentifier = UUID.randomUUID().toString(); - for (long i = 0; i < numberOfFetches; i++) { - // Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit) - if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) { - maxValueClauses.add(columnForPartitioning + " <= " + maxValueForPartitioning); - limit = null; - } + List flowFilesToTransfer = new ArrayList<>(); + + Map baseAttributes = new HashMap<>(); + baseAttributes.put("generatetablefetch.tableName", tableName); + if (columnNames != null) { + baseAttributes.put("generatetablefetch.columnNames", columnNames); + } + + final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); + if (StringUtils.isNotBlank(maxColumnNames)) { + baseAttributes.put("generatetablefetch.maxColumnNames", maxColumnNames); + } + + baseAttributes.put(FRAGMENT_ID, fragmentIdentifier); + baseAttributes.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches)); + + // If there are no SQL statements to be generated, still output an empty flow file if specified by the user + if (numberOfFetches == 0 && outputEmptyFlowFileOnZeroResults) { + FlowFile emptyFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); + Map attributesToAdd = new HashMap<>(); - //Update WHERE list to include new right hand boundaries whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND "); + attributesToAdd.put("generatetablefetch.whereClause", whereClause); - Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0); - - final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); - final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning); - FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); - sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); - Map attributesToAdd = new HashMap<>(); - - attributesToAdd.put("generatetablefetch.tableName", tableName); - if (columnNames != null) { - attributesToAdd.put("generatetablefetch.columnNames", columnNames); - } - if (StringUtils.isNotBlank(whereClause)) { - attributesToAdd.put("generatetablefetch.whereClause", whereClause); - } - if (StringUtils.isNotBlank(maxColumnNames)) { - attributesToAdd.put("generatetablefetch.maxColumnNames", maxColumnNames); - } - attributesToAdd.put("generatetablefetch.limit", String.valueOf(limit)); + attributesToAdd.put("generatetablefetch.limit", null); if (partitionSize != 0) { - attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset)); + attributesToAdd.put("generatetablefetch.offset", null); } // Add fragment attributes - attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier); - attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i)); - attributesToAdd.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches)); + attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(0)); - sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd); - session.transfer(sqlFlowFile, REL_SUCCESS); + attributesToAdd.putAll(baseAttributes); + emptyFlowFile = session.putAllAttributes(emptyFlowFile, attributesToAdd); + flowFilesToTransfer.add(emptyFlowFile); + } else { + Long limit = partitionSize == 0 ? null : (long) partitionSize; + for (long i = 0; i < numberOfFetches; i++) { + // Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit) + if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) { + maxValueClauses.add(columnForPartitioning + " <= " + maxValueForPartitioning); + limit = null; + } + + //Update WHERE list to include new right hand boundaries + whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND "); + Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0); + + final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning); + FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); + sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); + Map attributesToAdd = new HashMap<>(); + + attributesToAdd.put("generatetablefetch.whereClause", whereClause); + attributesToAdd.put("generatetablefetch.limit", (limit == null) ? null : limit.toString()); + if (partitionSize != 0) { + attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset)); + } + // Add fragment attributes + attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i)); + + attributesToAdd.putAll(baseAttributes); + sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd); + flowFilesToTransfer.add(sqlFlowFile); + } } + session.transfer(flowFilesToTransfer, REL_SUCCESS); + if (fileToProcess != null) { session.remove(fileToProcess); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 6e0c3971e5..8ccca2c4ec 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -54,6 +54,7 @@ import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.REL_SUCCESS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; @@ -539,10 +540,54 @@ public class TestGenerateTableFetch { runner.run(); runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1); - runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0).assertContentEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID"); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0); + flowFile.assertContentEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID"); + flowFile.assertAttributeExists("generatetablefetch.limit"); + flowFile.assertAttributeEquals("generatetablefetch.limit", null); runner.clearTransferState(); } + @Test + public void testFlowFileGeneratedOnZeroResults() throws SQLException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(GenerateTableFetch.COLUMN_NAMES, "ID,BUCKET"); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID"); + // Set partition size to 0 so we can see that the flow file gets all rows + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "1"); + runner.setProperty(GenerateTableFetch.OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS, "false"); + + runner.run(); + runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 0); + runner.clearTransferState(); + + runner.setProperty(GenerateTableFetch.OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS, "true"); + runner.run(); + runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName")); + assertEquals("ID,BUCKET", flowFile.getAttribute("generatetablefetch.columnNames")); + assertEquals("1=1", flowFile.getAttribute("generatetablefetch.whereClause")); + assertEquals("ID", flowFile.getAttribute("generatetablefetch.maxColumnNames")); + assertNull(flowFile.getAttribute("generatetablefetch.limit")); + assertNull(flowFile.getAttribute("generatetablefetch.offset")); + assertEquals("0", flowFile.getAttribute("fragment.index")); + assertEquals("0", flowFile.getAttribute("fragment.count")); + } + @Test public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException {