NIFI-5339 - Better Time Tracking for ExecuteSQL Durations

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2817.
This commit is contained in:
patricker 2018-06-26 13:27:11 -06:00 committed by Pierre Villard
parent 6508c191bf
commit c1083dfb62
2 changed files with 22 additions and 4 deletions

View File

@ -100,6 +100,8 @@ public class ExecuteSQL extends AbstractProcessor {
public static final String RESULT_ROW_COUNT = "executesql.row.count";
public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
public static final String RESULTSET_INDEX = "executesql.resultset.index";
// Relationships
@ -203,7 +205,6 @@ public class ExecuteSQL extends AbstractProcessor {
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
final StopWatch stopWatch = new StopWatch(true);
final String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
@ -224,7 +225,12 @@ public class ExecuteSQL extends AbstractProcessor {
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
}
logger.debug("Executing query {}", new Object[]{selectQuery});
final StopWatch executionTime = new StopWatch(true);
boolean hasResults = st.execute();
long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
boolean hasUpdateCount = st.getUpdateCount() != -1;
while(hasResults || hasUpdateCount) {
@ -238,6 +244,8 @@ public class ExecuteSQL extends AbstractProcessor {
resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
}
final StopWatch fetchTime = new StopWatch(true);
final AtomicLong nrOfRows = new AtomicLong(0L);
resultSetFF = session.write(resultSetFF, out -> {
try {
@ -255,17 +263,19 @@ public class ExecuteSQL extends AbstractProcessor {
}
});
long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
// set attribute how many rows were selected
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration));
resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{resultSetFF, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration);
session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
session.transfer(resultSetFF, REL_SUCCESS);
resultCount++;

View File

@ -312,9 +312,17 @@ public class TestExecuteSQL {
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_FETCH_TIME);
runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_ROW_COUNT);
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
final long executionTime = Long.parseLong(flowfiles.get(0).getAttribute(ExecuteSQL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(flowfiles.get(0).getAttribute(ExecuteSQL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(flowfiles.get(0).getAttribute(ExecuteSQL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {