NIFI-4355 - query execution time as attribute of ExecuteSQL

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2129
This commit is contained in:
Pierre Villard 2017-09-06 15:28:48 +02:00 committed by Matthew Burgess
parent 0536c3edf1
commit 458c987fe3
2 changed files with 12 additions and 3 deletions

View File

@ -36,6 +36,7 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -68,10 +69,14 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
+ "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+ "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the "
+ "select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
@WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query")
@WritesAttributes({
@WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"),
@WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds")
})
public class ExecuteSQL extends AbstractProcessor {
public static final String RESULT_ROW_COUNT = "executesql.row.count";
public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -217,14 +222,16 @@ public class ExecuteSQL extends AbstractProcessor {
}
});
long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
fileToProcess = session.putAttribute(fileToProcess, RESULT_QUERY_DURATION, String.valueOf(duration));
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows",
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", duration);
session.transfer(fileToProcess, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
if (fileToProcess == null) {

View File

@ -262,6 +262,8 @@ public class TestExecuteSQL {
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_ROW_COUNT);
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());