NIFI-7859: Support for capturing execution duration of query run as attributes in SelectHiveQL processors

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4560
This commit is contained in:
Mohammed Nadeem 2020-09-29 23:20:00 +05:30 committed by Matthew Burgess
parent b2489f7644
commit e2ccfbbacf
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
4 changed files with 186 additions and 2 deletions

View File

@ -80,6 +80,12 @@ import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type for the outgoing flowfile to application/avro-binary for Avro or text/csv for CSV."),
@WritesAttribute(attribute = "filename", description = "Adds .avro or .csv to the filename attribute depending on which output format is selected."),
@WritesAttribute(attribute = "selecthiveql.row.count", description = "Indicates how many rows were selected/returned by the query."),
@WritesAttribute(attribute = "selecthiveql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds. "
+ "If 'Max Rows Per Flow File' is set, then this number will reflect only the fetch time for the rows in the Flow File instead of the entire result set."),
@WritesAttribute(attribute = "selecthiveql.query.executiontime", description = "Duration of the query execution time in milliseconds. "
+ "This number will reflect the query execution time regardless of the 'Max Rows Per Flow File' setting."),
@WritesAttribute(attribute = "selecthiveql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds. "
+ "If 'Max Rows Per Flow File' is set, then this number will reflect only the fetch time for the rows in the Flow File instead of the entire result set."),
@WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
@ -94,6 +100,9 @@ import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
public class SelectHiveQL extends AbstractHiveQLProcessor {
public static final String RESULT_ROW_COUNT = "selecthiveql.row.count";
public static final String RESULT_QUERY_DURATION = "selecthiveql.query.duration";
public static final String RESULT_QUERY_EXECUTION_TIME = "selecthiveql.query.executiontime";
public static final String RESULT_QUERY_FETCH_TIME = "selecthiveql.query.fetchtime";
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -372,6 +381,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
}
}
final StopWatch executionTime = new StopWatch(true);
final ResultSet resultSet;
try {
@ -382,11 +392,14 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
fileToProcess = null;
throw se;
}
long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
int fragmentIndex = 0;
String baseFilename = (fileToProcess != null) ? fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) : null;
while (true) {
final AtomicLong nrOfRows = new AtomicLong(0L);
final StopWatch fetchTime = new StopWatch(true);
flowfile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
if (baseFilename == null) {
baseFilename = flowfile.getAttribute(CoreAttributes.FILENAME.key());
@ -412,6 +425,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
resultSetFlowFiles.add(flowfile);
throw e;
}
long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
if (nrOfRows.get() > 0 || resultSetFlowFiles.isEmpty()) {
final Map<String, String> attributes = new HashMap<>();
@ -440,6 +454,10 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
attributes.put("fragment.index", String.valueOf(fragmentIndex));
}
attributes.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
attributes.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
attributes.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
flowfile = session.putAllAttributes(flowfile, attributes);
logger.info("{} contains {} " + outputFormat + " records; transferring to 'success'",

View File

@ -174,7 +174,18 @@ public class TestSelectHiveQL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(0).assertAttributeEquals(SelectHiveQL.RESULT_ROW_COUNT, "2");
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME);
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS);
flowfiles.get(0).assertAttributeEquals(SelectHiveQL.RESULT_ROW_COUNT, "2");
final long executionTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHiveQL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
}
@Test
@ -325,6 +336,10 @@ public class TestSelectHiveQL {
TestRunner runner = doOnTrigger(query, incomingFlowFile, outputFormat, preQueries, postQueries);
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME);
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS);
MockFlowFile flowFile = flowfiles.get(0);
@ -365,8 +380,14 @@ public class TestSelectHiveQL {
}
}
}
final long executionTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION));
assertEquals(NUM_OF_ROWS - 10, recordsFromStream);
assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT)));
assertEquals(durationTime, fetchTime + executionTime);
flowFile.assertAttributeEquals(AbstractHiveQLProcessor.ATTR_INPUT_TABLES, "persons");
}
@ -450,10 +471,20 @@ public class TestSelectHiveQL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 12);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME);
//ensure all but the last file have 9 records each
for (int ff = 0; ff < 11; ff++) {
mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(ff);
final long executionTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
@ -464,6 +495,12 @@ public class TestSelectHiveQL {
//last file should have 1 record
mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(11);
final long executionTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
mff.assertAttributeExists("fragment.identifier");
@ -504,7 +541,18 @@ public class TestSelectHiveQL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(0);
final long executionTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
// Assert the attributes from the incoming flow file are preserved in the outgoing flow file(s)
flowFile.assertAttributeEquals("hiveql.args.1.value", "1");
flowFile.assertAttributeEquals("hiveql.args.1.type", String.valueOf(Types.INTEGER));
@ -544,10 +592,20 @@ public class TestSelectHiveQL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 12);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME);
//ensure all but the last file have 9 records (10 lines = 9 records + header) each
for (int ff = 0; ff < 11; ff++) {
mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(ff);
final long executionTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
in = new ByteArrayInputStream(mff.toByteArray());
BufferedReader br = new BufferedReader(new InputStreamReader(in));
assertEquals(10, br.lines().count());
@ -559,6 +617,12 @@ public class TestSelectHiveQL {
//last file should have 1 record (2 lines = 1 record + header)
mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(11);
final long executionTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
in = new ByteArrayInputStream(mff.toByteArray());
BufferedReader br = new BufferedReader(new InputStreamReader(in));
assertEquals(2, br.lines().count());
@ -599,9 +663,19 @@ public class TestSelectHiveQL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, maxFragments);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME);
for (int i = 0; i < maxFragments; i++) {
mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(i);
final long executionTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));

View File

@ -80,6 +80,12 @@ import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type for the outgoing flowfile to application/avro-binary for Avro or text/csv for CSV."),
@WritesAttribute(attribute = "filename", description = "Adds .avro or .csv to the filename attribute depending on which output format is selected."),
@WritesAttribute(attribute = "selecthiveql.row.count", description = "Indicates how many rows were selected/returned by the query."),
@WritesAttribute(attribute = "selecthiveql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds. "
+ "If 'Max Rows Per Flow File' is set, then this number will reflect only the fetch time for the rows in the Flow File instead of the entire result set."),
@WritesAttribute(attribute = "selecthiveql.query.executiontime", description = "Duration of the query execution time in milliseconds. "
+ "This number will reflect the query execution time regardless of the 'Max Rows Per Flow File' setting."),
@WritesAttribute(attribute = "selecthiveql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds. "
+ "If 'Max Rows Per Flow File' is set, then this number will reflect only the fetch time for the rows in the Flow File instead of the entire result set."),
@WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
@ -94,6 +100,9 @@ import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
public class SelectHive3QL extends AbstractHive3QLProcessor {
static final String RESULT_ROW_COUNT = "selecthiveql.row.count";
public static final String RESULT_QUERY_DURATION = "selecthiveql.query.duration";
public static final String RESULT_QUERY_EXECUTION_TIME = "selecthiveql.query.executiontime";
public static final String RESULT_QUERY_FETCH_TIME = "selecthiveql.query.fetchtime";
// Relationships
static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -375,6 +384,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
}
}
final StopWatch executionTime = new StopWatch(true);
final ResultSet resultSet;
try {
@ -385,11 +395,14 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
fileToProcess = null;
throw se;
}
long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
int fragmentIndex = 0;
String baseFilename = (fileToProcess != null) ? fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) : null;
while (true) {
final AtomicLong nrOfRows = new AtomicLong(0L);
final StopWatch fetchTime = new StopWatch(true);
flowfile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
if (baseFilename == null) {
baseFilename = flowfile.getAttribute(CoreAttributes.FILENAME.key());
@ -415,6 +428,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
resultSetFlowFiles.add(flowfile);
throw e;
}
long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
if (nrOfRows.get() > 0 || resultSetFlowFiles.isEmpty()) {
final Map<String, String> attributes = new HashMap<>();
@ -443,6 +457,10 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
attributes.put("fragment.index", String.valueOf(fragmentIndex));
}
attributes.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
attributes.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
attributes.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
flowfile = session.putAllAttributes(flowfile, attributes);
logger.info("{} contains {} " + outputFormat + " records; transferring to 'success'",

View File

@ -174,7 +174,18 @@ public class TestSelectHive3QL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0).assertAttributeEquals(SelectHive3QL.RESULT_ROW_COUNT, "2");
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME);
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS);
flowfiles.get(0).assertAttributeEquals(SelectHive3QL.RESULT_ROW_COUNT, "2");
final long executionTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHive3QL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
}
@Test
@ -325,6 +336,10 @@ public class TestSelectHive3QL {
TestRunner runner = doOnTrigger(query, incomingFlowFile, outputFormat, preQueries, postQueries);
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME);
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS);
MockFlowFile flowFile = flowfiles.get(0);
@ -365,8 +380,14 @@ public class TestSelectHive3QL {
}
}
}
final long executionTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION));
assertEquals(NUM_OF_ROWS - 10, recordsFromStream);
assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHive3QL.RESULT_ROW_COUNT)));
assertEquals(durationTime, fetchTime + executionTime);
flowFile.assertAttributeEquals(AbstractHive3QLProcessor.ATTR_INPUT_TABLES, "persons");
}
@ -450,10 +471,20 @@ public class TestSelectHive3QL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 12);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME);
//ensure all but the last file have 9 records each
for (int ff = 0; ff < 11; ff++) {
mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(ff);
final long executionTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
@ -464,6 +495,12 @@ public class TestSelectHive3QL {
//last file should have 1 record
mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(11);
final long executionTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
mff.assertAttributeExists("fragment.identifier");
@ -504,7 +541,18 @@ public class TestSelectHive3QL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0);
final long executionTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
// Assert the attributes from the incoming flow file are preserved in the outgoing flow file(s)
flowFile.assertAttributeEquals("hiveql.args.1.value", "1");
flowFile.assertAttributeEquals("hiveql.args.1.type", String.valueOf(Types.INTEGER));
@ -544,10 +592,20 @@ public class TestSelectHive3QL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 12);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME);
//ensure all but the last file have 9 records (10 lines = 9 records + header) each
for (int ff = 0; ff < 11; ff++) {
mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(ff);
final long executionTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
in = new ByteArrayInputStream(mff.toByteArray());
BufferedReader br = new BufferedReader(new InputStreamReader(in));
assertEquals(10, br.lines().count());
@ -559,6 +617,12 @@ public class TestSelectHive3QL {
//last file should have 1 record (2 lines = 1 record + header)
mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(11);
final long executionTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
in = new ByteArrayInputStream(mff.toByteArray());
BufferedReader br = new BufferedReader(new InputStreamReader(in));
assertEquals(2, br.lines().count());
@ -599,9 +663,19 @@ public class TestSelectHive3QL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, maxFragments);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME);
for (int i = 0; i < maxFragments; i++) {
mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(i);
final long executionTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));