From 72e6accc1240f7a4e7e7ee1f658615349e9db0e1 Mon Sep 17 00:00:00 2001
From: Lehel
Date: Wed, 27 Oct 2021 11:12:13 +0200
Subject: [PATCH] NIFI-9327: Added timewindow query to QueryNiFiReportingTask
and MetricsEventReportingTask
---
.../sql/MetricsEventReportingTask.java | 29 +-
.../reporting/sql/QueryNiFiReportingTask.java | 23 +-
.../nifi/reporting/sql/QueryTimeAware.java | 51 +++
.../reporting/sql/util/TrackedQueryTime.java | 35 ++
.../additionalDetails.html | 29 ++
.../additionalDetails.html | 13 +
.../record/sink/MockRecordSinkService.java | 3 +-
.../sql/TestMetricsEventReportingTask.java | 306 ++++++++++++++----
.../sql/TestQueryNiFiReportingTask.java | 276 +++++++++++++---
.../MockPropertyContextActionHandler.java | 27 +-
.../rules/engine/MockRulesEngineService.java | 13 +-
11 files changed, 649 insertions(+), 156 deletions(-)
create mode 100644 nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryTimeAware.java
create mode 100644 nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
index 9a930ef760..c3a0baa47e 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
@@ -16,10 +16,12 @@
*/
package org.apache.nifi.reporting.sql;
+import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
@@ -31,19 +33,23 @@ import org.apache.nifi.rules.engine.RulesEngineService;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_END_TIME;
+import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_START_TIME;
+import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_END_TIME;
+import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_START_TIME;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
@Tags({"reporting", "rules", "action", "action handler", "status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "sql"})
@CapabilityDescription("Triggers rules-driven actions based on metrics values ")
-public class MetricsEventReportingTask extends AbstractReportingTask {
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last execution time so that on restart the task knows where it left off.")
+public class MetricsEventReportingTask extends AbstractReportingTask implements QueryTimeAware {
private List properties;
private MetricsQueryService metricsQueryService;
@@ -67,7 +73,7 @@ public class MetricsEventReportingTask extends AbstractReportingTask {
}
@OnScheduled
- public void setup(final ConfigurationContext context) throws IOException {
+ public void setup(final ConfigurationContext context) {
actionHandler = context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class);
rulesEngineService = context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class);
final Integer defaultPrecision = context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
@@ -77,17 +83,20 @@ public class MetricsEventReportingTask extends AbstractReportingTask {
@Override
public void onTrigger(ReportingContext context) {
+ String sql = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
try {
- final String query = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
- fireRules(context, actionHandler, rulesEngineService, query);
+ sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, BULLETIN_END_TIME);
+ sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME, PROVENANCE_END_TIME);
+
+ fireRules(context, actionHandler, rulesEngineService, sql);
} catch (Exception e) {
getLogger().error("Error opening loading rules: {}", new Object[]{e.getMessage()}, e);
}
}
private void fireRules(ReportingContext context, PropertyContextActionHandler actionHandler, RulesEngineService engine, String query) throws Exception {
+ getLogger().debug("Executing query: {}", query);
QueryResult queryResult = metricsQueryService.query(context, query);
- getLogger().debug("Executing query: {}", new Object[]{ query });
ResultSetRecordSet recordSet = metricsQueryService.getResultSetRecordSet(queryResult);
Record record;
try {
@@ -97,16 +106,14 @@ public class MetricsEventReportingTask extends AbstractReportingTask {
facts.put(fieldName, record.getValue(fieldName));
}
List actions = engine.fireRules(facts);
- if(actions == null || actions.isEmpty()){
+ if (actions == null || actions.isEmpty()) {
getLogger().debug("No actions required for provided facts.");
} else {
- actions.forEach(action -> {
- actionHandler.execute(context, action,facts);
- });
+ actions.forEach(action -> actionHandler.execute(context, action, facts));
}
}
} finally {
metricsQueryService.closeQuietly(recordSet);
}
}
-}
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
index 367db30978..61edaa2773 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
@@ -16,10 +16,12 @@
*/
package org.apache.nifi.reporting.sql;
+import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.AbstractReportingTask;
@@ -29,7 +31,6 @@ import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.StopWatch;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -38,6 +39,10 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_END_TIME;
+import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_START_TIME;
+import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_END_TIME;
+import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_START_TIME;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
@@ -46,7 +51,8 @@ import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFA
+ "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables, and can use any functions or capabilities provided by Apache Calcite. Note that the "
+ "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled (see the nifi.analytics.predict.enabled property in nifi.properties). Attempting a "
+ "query on the table when the capability is disabled will cause an error.")
-public class QueryNiFiReportingTask extends AbstractReportingTask {
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last execution time so that on restart the task knows where it left off.")
+public class QueryNiFiReportingTask extends AbstractReportingTask implements QueryTimeAware {
private List properties;
@@ -71,7 +77,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
}
@OnScheduled
- public void setup(final ConfigurationContext context) throws IOException {
+ public void setup(final ConfigurationContext context) {
recordSinkService = context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class);
recordSinkService.reset();
final Integer defaultPrecision = context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
@@ -82,13 +88,16 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
@Override
public void onTrigger(ReportingContext context) {
final StopWatch stopWatch = new StopWatch(true);
+ String sql = context.getProperty(QueryMetricsUtil.QUERY).getValue();
try {
- final String sql = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
+ sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, BULLETIN_END_TIME);
+ sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME, PROVENANCE_END_TIME);
+
+ getLogger().debug("Executing query: {}", sql);
final QueryResult queryResult = metricsQueryService.query(context, sql);
final ResultSetRecordSet recordSet;
try {
- getLogger().debug("Executing query: {}", new Object[]{sql});
recordSet = metricsQueryService.getResultSetRecordSet(queryResult);
} catch (final Exception e) {
getLogger().error("Error creating record set from query results due to {}", new Object[]{e.getMessage()}, e);
@@ -110,10 +119,10 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
metricsQueryService.closeQuietly(queryResult);
}
final long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
- getLogger().debug("Successfully queried and sent in {} millis", new Object[]{elapsedMillis});
+ getLogger().debug("Successfully queried and sent in {} millis", elapsedMillis);
} catch (Exception e) {
getLogger().error("Error processing the query due to {}", new Object[]{e.getMessage()}, e);
}
}
-
}
+
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryTimeAware.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryTimeAware.java
new file mode 100644
index 0000000000..7d2f0b2390
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryTimeAware.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.sql.util.TrackedQueryTime;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+public interface QueryTimeAware {
+
+ default String processStartAndEndTimes(ReportingContext context, String sql, TrackedQueryTime queryStartTime, TrackedQueryTime queryEndTime) throws IOException {
+ StateManager stateManager = context.getStateManager();
+ final Map stateMap = new HashMap<>(stateManager.getState(Scope.LOCAL).toMap());
+
+ if (sql.contains(queryStartTime.getSqlPlaceholder()) && sql.contains(queryEndTime.getSqlPlaceholder())) {
+ final long startTime = stateMap.get(queryStartTime.name()) == null ? 0 : Long.parseLong(stateMap.get(queryStartTime.name()));
+ final long currentTime = getCurrentTime();
+
+ sql = sql.replace(queryStartTime.getSqlPlaceholder(), String.valueOf(startTime));
+ sql = sql.replace(queryEndTime.getSqlPlaceholder(), String.valueOf(currentTime));
+
+ stateMap.put(queryStartTime.name(), String.valueOf(currentTime));
+ stateManager.setState(stateMap, Scope.LOCAL);
+ }
+ return sql;
+ }
+
+ default long getCurrentTime() {
+ return Instant.now().toEpochMilli();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
new file mode 100644
index 0000000000..cf46a291a4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql.util;
+
+public enum TrackedQueryTime {
+
+ BULLETIN_START_TIME("$bulletinStartTime"),
+ BULLETIN_END_TIME("$bulletinEndTime"),
+ PROVENANCE_START_TIME("$provenanceStartTime"),
+ PROVENANCE_END_TIME("$provenanceEndTime");
+
+ private final String sqlPlaceholder;
+
+ TrackedQueryTime(final String sqlPlaceholder) {
+ this.sqlPlaceholder = sqlPlaceholder;
+ }
+
+ public String getSqlPlaceholder() {
+ return sqlPlaceholder;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
index 2392aab231..f29975f236 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
@@ -29,6 +29,35 @@
A distinct ActionHandler can be used to service all events or an ActionHandlerLookup can be used for dynamic handler lookup. NOTE: Optimally action handler should be associated with the expected action types
returned from the rules engine.
+
+ The reporting task can uniquely handle items from the bulletin and provenance repositories. This means that an item will only be processed once when the query is set to unique.
+ The query can be set to unique by defining a time window with special sql placeholders ($bulletinStartTime, $bulletinEndTime, $provenanceStartTime, $provenanceEndTime)
+ that the reporting task will evaluate runtime. See the SQL Query Examples section.
+
+
+SQL Query Examples
+
+ Example: Select all fields from the CONNECTION_STATUS
table:
+
SELECT * FROM CONNECTION_STATUS
+
+
+
+ Example: Select connection IDs where time-to-backpressure (based on queue count) is less than 5 minutes:
+
SELECT connectionId FROM CONNECTION_STATUS_PREDICTIONS WHERE predictedTimeToCountBackpressureMillis < 300000
+
+
+
+ Example: Get the unique bulletin categories associated with errors:
+
SELECT DISTINCT(bulletinCategory) FROM BULLETINS WHERE bulletinLevel = "ERROR"
+
+
+ Example: Select all fields from the BULLETINS
table with time window:
+
SELECT * from BULLETINS WHERE bulletinTimestamp > $bulletinStartTime AND bulletinTimestamp <= $bulletinEndTime
+
+
+ Example: Select all fields from the PROVENANCE
table with time window:
+
SELECT * from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime
+