diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index 3b8576b22e..7d7bf0b5d8 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -80,6 +80,12 @@ import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO; @WritesAttribute(attribute = "mime.type", description = "Sets the MIME type for the outgoing flowfile to application/avro-binary for Avro or text/csv for CSV."), @WritesAttribute(attribute = "filename", description = "Adds .avro or .csv to the filename attribute depending on which output format is selected."), @WritesAttribute(attribute = "selecthiveql.row.count", description = "Indicates how many rows were selected/returned by the query."), + @WritesAttribute(attribute = "selecthiveql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds. " + + "If 'Max Rows Per Flow File' is set, then this number will reflect only the fetch time for the rows in the Flow File instead of the entire result set."), + @WritesAttribute(attribute = "selecthiveql.query.executiontime", description = "Duration of the query execution time in milliseconds. " + + "This number will reflect the query execution time regardless of the 'Max Rows Per Flow File' setting."), + @WritesAttribute(attribute = "selecthiveql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds. " + + "If 'Max Rows Per Flow File' is set, then this number will reflect only the fetch time for the rows in the Flow File instead of the entire result set."), @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " @@ -94,6 +100,9 @@ import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO; public class SelectHiveQL extends AbstractHiveQLProcessor { public static final String RESULT_ROW_COUNT = "selecthiveql.row.count"; + public static final String RESULT_QUERY_DURATION = "selecthiveql.query.duration"; + public static final String RESULT_QUERY_EXECUTION_TIME = "selecthiveql.query.executiontime"; + public static final String RESULT_QUERY_FETCH_TIME = "selecthiveql.query.fetchtime"; // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -372,6 +381,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { } } + final StopWatch executionTime = new StopWatch(true); final ResultSet resultSet; try { @@ -382,11 +392,14 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { fileToProcess = null; throw se; } + long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS); int fragmentIndex = 0; String baseFilename = (fileToProcess != null) ? fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) : null; while (true) { final AtomicLong nrOfRows = new AtomicLong(0L); + final StopWatch fetchTime = new StopWatch(true); + flowfile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); if (baseFilename == null) { baseFilename = flowfile.getAttribute(CoreAttributes.FILENAME.key()); @@ -412,6 +425,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { resultSetFlowFiles.add(flowfile); throw e; } + long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS); if (nrOfRows.get() > 0 || resultSetFlowFiles.isEmpty()) { final Map attributes = new HashMap<>(); @@ -440,6 +454,10 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { attributes.put("fragment.index", String.valueOf(fragmentIndex)); } + attributes.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed)); + attributes.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed)); + attributes.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed)); + flowfile = session.putAllAttributes(flowfile, attributes); logger.info("{} contains {} " + outputFormat + " records; transferring to 'success'", diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java index bb8448654f..e0924fb730 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java @@ -174,7 +174,18 @@ public class TestSelectHiveQL { runner.run(); runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1); - runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(0).assertAttributeEquals(SelectHiveQL.RESULT_ROW_COUNT, "2"); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME); + + final List flowfiles = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS); + flowfiles.get(0).assertAttributeEquals(SelectHiveQL.RESULT_ROW_COUNT, "2"); + final long executionTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHiveQL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); } @Test @@ -325,6 +336,10 @@ public class TestSelectHiveQL { TestRunner runner = doOnTrigger(query, incomingFlowFile, outputFormat, preQueries, postQueries); runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME); final List flowfiles = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS); MockFlowFile flowFile = flowfiles.get(0); @@ -365,8 +380,14 @@ public class TestSelectHiveQL { } } } + + final long executionTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION)); + assertEquals(NUM_OF_ROWS - 10, recordsFromStream); assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT))); + assertEquals(durationTime, fetchTime + executionTime); flowFile.assertAttributeEquals(AbstractHiveQLProcessor.ATTR_INPUT_TABLES, "persons"); } @@ -450,10 +471,20 @@ public class TestSelectHiveQL { runner.run(); runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 12); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME); //ensure all but the last file have 9 records each for (int ff = 0; ff < 11; ff++) { mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(ff); + final long executionTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + in = new ByteArrayInputStream(mff.toByteArray()); assertEquals(9, getNumberOfRecordsFromStream(in)); @@ -464,6 +495,12 @@ public class TestSelectHiveQL { //last file should have 1 record mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(11); + final long executionTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + in = new ByteArrayInputStream(mff.toByteArray()); assertEquals(1, getNumberOfRecordsFromStream(in)); mff.assertAttributeExists("fragment.identifier"); @@ -504,7 +541,18 @@ public class TestSelectHiveQL { runner.run(); runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(0); + final long executionTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(flowFile.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + // Assert the attributes from the incoming flow file are preserved in the outgoing flow file(s) flowFile.assertAttributeEquals("hiveql.args.1.value", "1"); flowFile.assertAttributeEquals("hiveql.args.1.type", String.valueOf(Types.INTEGER)); @@ -544,10 +592,20 @@ public class TestSelectHiveQL { runner.run(); runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 12); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME); //ensure all but the last file have 9 records (10 lines = 9 records + header) each for (int ff = 0; ff < 11; ff++) { mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(ff); + final long executionTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + in = new ByteArrayInputStream(mff.toByteArray()); BufferedReader br = new BufferedReader(new InputStreamReader(in)); assertEquals(10, br.lines().count()); @@ -559,6 +617,12 @@ public class TestSelectHiveQL { //last file should have 1 record (2 lines = 1 record + header) mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(11); + final long executionTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + in = new ByteArrayInputStream(mff.toByteArray()); BufferedReader br = new BufferedReader(new InputStreamReader(in)); assertEquals(2, br.lines().count()); @@ -599,9 +663,19 @@ public class TestSelectHiveQL { runner.run(); runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, maxFragments); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHiveQL.REL_SUCCESS, SelectHiveQL.RESULT_QUERY_FETCH_TIME); for (int i = 0; i < maxFragments; i++) { mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(i); + final long executionTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(mff.getAttribute(SelectHiveQL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + in = new ByteArrayInputStream(mff.toByteArray()); assertEquals(9, getNumberOfRecordsFromStream(in)); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java index 3bda93118f..f124c73637 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java @@ -80,6 +80,12 @@ import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO; @WritesAttribute(attribute = "mime.type", description = "Sets the MIME type for the outgoing flowfile to application/avro-binary for Avro or text/csv for CSV."), @WritesAttribute(attribute = "filename", description = "Adds .avro or .csv to the filename attribute depending on which output format is selected."), @WritesAttribute(attribute = "selecthiveql.row.count", description = "Indicates how many rows were selected/returned by the query."), + @WritesAttribute(attribute = "selecthiveql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds. " + + "If 'Max Rows Per Flow File' is set, then this number will reflect only the fetch time for the rows in the Flow File instead of the entire result set."), + @WritesAttribute(attribute = "selecthiveql.query.executiontime", description = "Duration of the query execution time in milliseconds. " + + "This number will reflect the query execution time regardless of the 'Max Rows Per Flow File' setting."), + @WritesAttribute(attribute = "selecthiveql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds. " + + "If 'Max Rows Per Flow File' is set, then this number will reflect only the fetch time for the rows in the Flow File instead of the entire result set."), @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " @@ -94,6 +100,9 @@ import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO; public class SelectHive3QL extends AbstractHive3QLProcessor { static final String RESULT_ROW_COUNT = "selecthiveql.row.count"; + public static final String RESULT_QUERY_DURATION = "selecthiveql.query.duration"; + public static final String RESULT_QUERY_EXECUTION_TIME = "selecthiveql.query.executiontime"; + public static final String RESULT_QUERY_FETCH_TIME = "selecthiveql.query.fetchtime"; // Relationships static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -375,6 +384,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor { } } + final StopWatch executionTime = new StopWatch(true); final ResultSet resultSet; try { @@ -385,11 +395,14 @@ public class SelectHive3QL extends AbstractHive3QLProcessor { fileToProcess = null; throw se; } + long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS); int fragmentIndex = 0; String baseFilename = (fileToProcess != null) ? fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) : null; while (true) { final AtomicLong nrOfRows = new AtomicLong(0L); + final StopWatch fetchTime = new StopWatch(true); + flowfile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); if (baseFilename == null) { baseFilename = flowfile.getAttribute(CoreAttributes.FILENAME.key()); @@ -415,6 +428,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor { resultSetFlowFiles.add(flowfile); throw e; } + long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS); if (nrOfRows.get() > 0 || resultSetFlowFiles.isEmpty()) { final Map attributes = new HashMap<>(); @@ -443,6 +457,10 @@ public class SelectHive3QL extends AbstractHive3QLProcessor { attributes.put("fragment.index", String.valueOf(fragmentIndex)); } + attributes.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed)); + attributes.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed)); + attributes.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed)); + flowfile = session.putAllAttributes(flowfile, attributes); logger.info("{} contains {} " + outputFormat + " records; transferring to 'success'", diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java index 77a20f8c83..eb1b065b40 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java @@ -174,7 +174,18 @@ public class TestSelectHive3QL { runner.run(); runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1); - runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0).assertAttributeEquals(SelectHive3QL.RESULT_ROW_COUNT, "2"); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME); + + final List flowfiles = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS); + flowfiles.get(0).assertAttributeEquals(SelectHive3QL.RESULT_ROW_COUNT, "2"); + final long executionTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(flowfiles.get(0).getAttribute(SelectHive3QL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); } @Test @@ -325,6 +336,10 @@ public class TestSelectHive3QL { TestRunner runner = doOnTrigger(query, incomingFlowFile, outputFormat, preQueries, postQueries); runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME); final List flowfiles = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS); MockFlowFile flowFile = flowfiles.get(0); @@ -365,8 +380,14 @@ public class TestSelectHive3QL { } } } + + final long executionTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION)); + assertEquals(NUM_OF_ROWS - 10, recordsFromStream); assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHive3QL.RESULT_ROW_COUNT))); + assertEquals(durationTime, fetchTime + executionTime); flowFile.assertAttributeEquals(AbstractHive3QLProcessor.ATTR_INPUT_TABLES, "persons"); } @@ -450,10 +471,20 @@ public class TestSelectHive3QL { runner.run(); runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 12); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME); //ensure all but the last file have 9 records each for (int ff = 0; ff < 11; ff++) { mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(ff); + final long executionTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + in = new ByteArrayInputStream(mff.toByteArray()); assertEquals(9, getNumberOfRecordsFromStream(in)); @@ -464,6 +495,12 @@ public class TestSelectHive3QL { //last file should have 1 record mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(11); + final long executionTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + in = new ByteArrayInputStream(mff.toByteArray()); assertEquals(1, getNumberOfRecordsFromStream(in)); mff.assertAttributeExists("fragment.identifier"); @@ -504,7 +541,18 @@ public class TestSelectHive3QL { runner.run(); runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0); + final long executionTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(flowFile.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + // Assert the attributes from the incoming flow file are preserved in the outgoing flow file(s) flowFile.assertAttributeEquals("hiveql.args.1.value", "1"); flowFile.assertAttributeEquals("hiveql.args.1.type", String.valueOf(Types.INTEGER)); @@ -544,10 +592,20 @@ public class TestSelectHive3QL { runner.run(); runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 12); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME); //ensure all but the last file have 9 records (10 lines = 9 records + header) each for (int ff = 0; ff < 11; ff++) { mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(ff); + final long executionTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + in = new ByteArrayInputStream(mff.toByteArray()); BufferedReader br = new BufferedReader(new InputStreamReader(in)); assertEquals(10, br.lines().count()); @@ -559,6 +617,12 @@ public class TestSelectHive3QL { //last file should have 1 record (2 lines = 1 record + header) mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(11); + final long executionTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + in = new ByteArrayInputStream(mff.toByteArray()); BufferedReader br = new BufferedReader(new InputStreamReader(in)); assertEquals(2, br.lines().count()); @@ -599,9 +663,19 @@ public class TestSelectHive3QL { runner.run(); runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, maxFragments); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_ROW_COUNT); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_EXECUTION_TIME); + runner.assertAllFlowFilesContainAttribute(SelectHive3QL.REL_SUCCESS, SelectHive3QL.RESULT_QUERY_FETCH_TIME); for (int i = 0; i < maxFragments; i++) { mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(i); + final long executionTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_EXECUTION_TIME)); + final long fetchTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_FETCH_TIME)); + final long durationTime = Long.parseLong(mff.getAttribute(SelectHive3QL.RESULT_QUERY_DURATION)); + + assertEquals(durationTime, fetchTime + executionTime); + in = new ByteArrayInputStream(mff.toByteArray()); assertEquals(9, getNumberOfRecordsFromStream(in));