diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 3f05766649..ad795953c4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -36,6 +36,7 @@ import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -68,10 +69,14 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + "select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") -@WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query") +@WritesAttributes({ + @WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"), + @WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds") +}) public class ExecuteSQL extends AbstractProcessor { public static final String RESULT_ROW_COUNT = "executesql.row.count"; + public static final String RESULT_QUERY_DURATION = "executesql.query.duration"; // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -217,14 +222,16 @@ public class ExecuteSQL extends AbstractProcessor { } }); + long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS); + // set attribute how many rows were selected fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + fileToProcess = session.putAttribute(fileToProcess, RESULT_QUERY_DURATION, String.valueOf(duration)); fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{fileToProcess, nrOfRows.get()}); - session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", - stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", duration); session.transfer(fileToProcess, REL_SUCCESS); } catch (final ProcessException | SQLException e) { if (fileToProcess == null) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 5659e4a1b1..5fd1af8200 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -262,6 +262,8 @@ public class TestExecuteSQL { runner.run(); runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_ROW_COUNT); final List flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS); final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());