From c998a7259aca73e716433d0582b30d7ed986c4b2 Mon Sep 17 00:00:00 2001 From: "Yolanda M. Davis" Date: Tue, 5 Nov 2019 14:25:58 -0500 Subject: [PATCH] NIFI-6842 - Introduce MetricsEventReportingTask NIFI-6842 - Added AlertHandler for bulletin reporting. Update ReportingTask meta data. NIFI-6842 - corrected display names in action handlers, included metrics option for alert handlers, small refactor in reporting task NIFI-6842 - updated docs and tags NIFI-6842 - Added documentation for handlers. Signed-off-by: Matthew Burgess This closes #3874 --- nifi-assembly/pom.xml | 12 + .../nifi/rules/handlers/AlertHandler.java | 169 +++++++++++ .../rules/handlers/ExpressionHandler.java | 10 +- .../nifi/rules/handlers/LogHandler.java | 29 +- .../rules/handlers/RecordSinkHandler.java | 5 +- ...g.apache.nifi.controller.ControllerService | 3 +- .../additionalDetails.html | 39 +++ .../additionalDetails.html | 38 +++ .../additionalDetails.html | 38 +++ .../additionalDetails.html | 37 +++ .../nifi/rules/handlers/TestAlertHandler.java | 264 ++++++++++++++++++ .../nifi-sql-reporting-tasks/pom.xml | 6 + .../sql/MetricsEventReportingTask.java | 105 +++++++ .../reporting/sql/QueryNiFiReportingTask.java | 84 +----- .../reporting/sql/util/QueryMetricsUtil.java | 114 ++++++++ .../org.apache.nifi.reporting.ReportingTask | 3 +- .../additionalDetails.html | 34 +++ .../sql/TestMetricsEventReportingTask.java | 224 +++++++++++++++ .../sql/TestQueryNiFiReportingTask.java | 21 +- .../MockPropertyContextActionHandler.java | 76 +++++ .../rules/engine/MockRulesEngineService.java | 47 ++++ 21 files changed, 1257 insertions(+), 101 deletions(-) create mode 100644 nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java create mode 100644 nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java create mode 100644 nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java create mode 100644 nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java create mode 100644 nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java create mode 100644 nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java create mode 100644 nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index f3381a776f..b036e6538f 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -878,6 +878,18 @@ language governing permissions and limitations under the License. --> 1.11.0-SNAPSHOT nar + + org.apache.nifi + nifi-easyrules-nar + 1.11.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-rules-action-handler-nar + 1.11.0-SNAPSHOT + nar + diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java new file mode 100644 index 0000000000..234ea3c88a --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.rules.handlers; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.rules.Action; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"rules", "rules engine", "action", "action handler", "logging", "alerts", "bulletins"}) +@CapabilityDescription("Creates alerts as bulletins based on a provided action (usually created by a rules engine). " + + "Action objects executed with this Handler should contain \"category\", \"message\", and \"logLevel\" attributes.") +public class AlertHandler extends AbstractActionHandlerService { + + public static final PropertyDescriptor DEFAULT_LOG_LEVEL = new PropertyDescriptor.Builder() + .name("alert-default-log-level") + .displayName("Default Alert Log Level") + .required(true) + .description("The default Log Level that will be used to log an alert message" + + " if a log level was not provided in the received action's attributes.") + .allowableValues(DebugLevels.values()) + .defaultValue("info") + .build(); + + public static final PropertyDescriptor DEFAULT_CATEGORY = new PropertyDescriptor.Builder() + .name("alert-default-category") + .displayName("Default Category") + .required(true) + .description("The default category to use when logging alert message "+ + " if a category was not provided in the received action's attributes.") + .defaultValue("Rules Triggered Alert") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DEFAULT_MESSAGE = new PropertyDescriptor.Builder() + .name("alert-default-message") + .displayName("Default Message") + .required(true) + .description("The default message to include in alert if an alert message was " + + "not provided in the received action's attributes") + .defaultValue("An alert was triggered by a rules-based action.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final PropertyDescriptor INCLUDE_FACTS = new PropertyDescriptor.Builder() + .name("alert-include-facts") + .displayName("Include Fact Data") + .required(true) + .description("If true, the alert message will include the facts which triggered this action. Default is false.") + .defaultValue("true") + .allowableValues("true", "false") + .build(); + + private List properties; + private String defaultCategory; + private String defaultLogLevel; + private String defaultMessage; + private Boolean includeFacts; + + @Override + protected void init(ControllerServiceInitializationContext config) throws InitializationException { + super.init(config); + final List properties = new ArrayList<>(); + properties.add(DEFAULT_LOG_LEVEL); + properties.add(DEFAULT_CATEGORY); + properties.add(DEFAULT_MESSAGE); + properties.add(INCLUDE_FACTS); + this.properties = Collections.unmodifiableList(properties); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase(); + defaultCategory = context.getProperty(DEFAULT_CATEGORY).getValue(); + defaultMessage = context.getProperty(DEFAULT_MESSAGE).getValue(); + includeFacts = context.getProperty(INCLUDE_FACTS).asBoolean(); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void execute(Action action, Map facts) { + throw new UnsupportedOperationException("This method is not supported. The AlertHandler requires a Reporting Context"); + } + + @Override + public void execute(PropertyContext propertyContext, Action action, Map facts) { + ComponentLog logger = getLogger(); + + if (propertyContext instanceof ReportingContext) { + + ReportingContext context = (ReportingContext) propertyContext; + Map attributes = action.getAttributes(); + if (context.getBulletinRepository() != null) { + final String category = attributes.getOrDefault("category", defaultCategory); + final String message = getMessage(attributes.getOrDefault("message", defaultMessage), facts); + final String level = attributes.getOrDefault("severity", attributes.getOrDefault("logLevel", defaultLogLevel)); + Severity severity; + try { + severity = Severity.valueOf(level.toUpperCase()); + } catch (IllegalArgumentException iae) { + severity = Severity.INFO; + } + BulletinRepository bulletinRepository = context.getBulletinRepository(); + bulletinRepository.addBulletin(context.createBulletin(category, severity, message)); + + } else { + logger.warn("Bulletin Repository is not available which is unusual. Cannot send a bulletin."); + } + + } else { + logger.warn("Reporting context was not provided to create bulletins."); + } + + } + + protected String getMessage(String alertMessage, Map facts){ + if (includeFacts) { + final StringBuilder message = new StringBuilder(alertMessage); + final Set fields = facts.keySet(); + message.append("\n"); + message.append("Alert Facts:\n"); + fields.forEach(field -> { + message.append("Field: "); + message.append(field); + message.append(", Value: "); + message.append(facts.get(field)); + message.append("\n"); + }); + return message.toString(); + }else{ + return alertMessage; + } + } + +} diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java index be41a5cf03..711c0ec5d8 100644 --- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java @@ -39,7 +39,7 @@ import java.util.Map; @Tags({"rules", "rules engine", "action", "action handler", "expression language","MVEL","SpEL"}) @CapabilityDescription("Executes an action containing an expression written in MVEL or SpEL. The action " + -"is usually created by a rules engine. ") +"is usually created by a rules engine. Action objects executed with this Handler should contain \"command\" and \"type\" attributes.") public class ExpressionHandler extends AbstractActionHandlerService { enum ExpresssionType { @@ -47,9 +47,11 @@ public class ExpressionHandler extends AbstractActionHandlerService { } public static final PropertyDescriptor DEFAULT_EXPRESSION_LANGUAGE_TYPE = new PropertyDescriptor.Builder() - .name("Expression Language Type") + .name("default-expression-language-type") + .displayName("Default Expression Language Type") .required(true) - .description("The expression language that should be used to compile and execute action. Supported languages are MVEL and Spring Expression Language (SpEL).") + .description("If an expression language type is not provided as an attribute within an Action, the default expression language that " + + "should be used to compile and execute action. Supported languages are MVEL and Spring Expression Language (SpEL).") .allowableValues(ExpresssionType.values()) .defaultValue("MVEL") .build(); @@ -82,7 +84,7 @@ public class ExpressionHandler extends AbstractActionHandlerService { final String command = attributes.get("command"); if(StringUtils.isNotEmpty(command)) { try { - final String type = attributes.get("type"); + final String type = attributes.getOrDefault("type",this.type.toString()); ExpresssionType expresssionType = ExpresssionType.valueOf(type); if (expresssionType.equals(ExpresssionType.MVEL)) { executeMVEL(command, facts); diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java index 666afbce52..20e935139a 100644 --- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java @@ -38,19 +38,32 @@ import java.util.Map; import java.util.Set; @Tags({"rules", "rules engine", "action", "action handler", "logging"}) -@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine)") +@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine). " + + " Action objects executed with this Handler should contain \"logLevel\" and \"message\" attributes.") public class LogHandler extends AbstractActionHandlerService { public static final PropertyDescriptor DEFAULT_LOG_LEVEL = new PropertyDescriptor.Builder() - .name("Log Level") + .name("logger-default-log-level") + .displayName("Default Log Level") .required(true) - .description("The Log Level to use when logging the Attributes") + .description("If a log level is not provided as an attribute within an Action, the default log level will be used.") .allowableValues(DebugLevels.values()) .defaultValue("info") .build(); + public static final PropertyDescriptor DEFAULT_LOG_MESSAGE = new PropertyDescriptor.Builder() + .name("logger-default-log-message") + .displayName("Default Log Message") + .required(true) + .description("If a log message is not provided as an attribute within an Action, the default log message will be used.") + .defaultValue("Rules Action Triggered Log.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + private static final PropertyDescriptor LOG_FACTS = new PropertyDescriptor.Builder() - .name("Log Facts") + .name("log-facts") + .displayName("Log Facts") .required(true) .description("If true, the log message will include the facts which triggered this log action.") .defaultValue("true") @@ -58,7 +71,8 @@ public class LogHandler extends AbstractActionHandlerService { .build(); private static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder() - .name("Log prefix") + .name("log-prefix") + .displayName("Log Prefix") .required(false) .description("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -69,6 +83,7 @@ public class LogHandler extends AbstractActionHandlerService { private String logPrefix; private Boolean logFacts; private String defaultLogLevel; + private String defaultLogMessage; @Override protected void init(ControllerServiceInitializationContext config) throws InitializationException { @@ -77,6 +92,7 @@ public class LogHandler extends AbstractActionHandlerService { properties.add(LOG_PREFIX); properties.add(LOG_FACTS); properties.add(DEFAULT_LOG_LEVEL); + properties.add(DEFAULT_LOG_MESSAGE); this.properties = Collections.unmodifiableList(properties); } @@ -85,6 +101,7 @@ public class LogHandler extends AbstractActionHandlerService { logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions().getValue(); logFacts = context.getProperty(LOG_FACTS).asBoolean(); defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase(); + defaultLogMessage = context.getProperty(DEFAULT_LOG_MESSAGE).evaluateAttributeExpressions().getValue(); } @Override @@ -98,7 +115,7 @@ public class LogHandler extends AbstractActionHandlerService { Map attributes = action.getAttributes(); final String logLevel = attributes.get("logLevel"); final LogLevel level = getLogLevel(logLevel, LogLevel.valueOf(defaultLogLevel)); - final String eventMessage = StringUtils.isNotEmpty(attributes.get("message")) ? attributes.get("message") : "Rules Action Triggered Log."; + final String eventMessage = StringUtils.isNotEmpty(attributes.get("message")) ? attributes.get("message") : defaultLogMessage; final String factsMessage = createFactsLogMessage(facts, eventMessage); logger.log(level, factsMessage); } diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java index 760315e1e8..ee3ca25d4d 100644 --- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java @@ -43,8 +43,9 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -@Tags({"rules", "rules engine", "action", "action handler", "logging"}) -@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine)") +@Tags({"rules", "rules engine", "action", "action handler", "record", "record sink"}) +@CapabilityDescription("Sends fact information to sink based on a provided action (usually created by a rules engine)." + + " Action objects executed with this Handler should contain \"sendZeroResult\" attribute.") public class RecordSinkHandler extends AbstractActionHandlerService{ static final PropertyDescriptor RECORD_SINK_SERVICE = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 7d2967ff50..461f818f3c 100644 --- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -15,4 +15,5 @@ org.apache.nifi.rules.handlers.ActionHandlerLookup org.apache.nifi.rules.handlers.ExpressionHandler org.apache.nifi.rules.handlers.LogHandler -org.apache.nifi.rules.handlers.RecordSinkHandler \ No newline at end of file +org.apache.nifi.rules.handlers.RecordSinkHandler +org.apache.nifi.rules.handlers.AlertHandler \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html new file mode 100644 index 0000000000..ea1a49851a --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html @@ -0,0 +1,39 @@ + + + + + + AlertHandler + + + + + +

Summary

+

+ The AlertHandler is used to broadcast alerts (bulletins) as dictated by the action object. Action objects can include attributes to configure + the handler otherwise default values will be used. Possible attribute values are listed below. +

+

ExpressionHandler Service Attributes

+ + + + + +
AttributeDescription
categoryThe category the alert should be grouped under.
logLevelLog Level for the alert. Possible values are trace, debug, info, warn, error.
messageMessage for the alert.
+
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html new file mode 100644 index 0000000000..8dff1bc188 --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html @@ -0,0 +1,38 @@ + + + + + + ExpressionHandler + + + + + +

Summary

+

+ The ExpressionHandler is used to execute dynamic commands writtin in MVEL or SpEL expression language. Action objects must include attributes to configure + the handler otherwise an exception will be thrown. Possible attribute values are listed below. +

+

ExpressionHandler Service Attributes

+ + + + +
AttributeDescription
typeThe expression language type of the command to be executed. Possible values are MVEL and SpEl (MVEL will be applied by default if type is not provided).
commandThe expression language command that should be executed
+
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html new file mode 100644 index 0000000000..b9b487da23 --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html @@ -0,0 +1,38 @@ + + + + + + LogHandler + + + + + +

Summary

+

+ The LogHandler is used to execute actions that dictate to log a message and/or metrics. LogHandler can be invoked with any Action object. + Action objects can include attributes to configure the LogHandler or rely on the handler's default settings. Possible attribute values are listed below. +

+

LogHandler Service Attributes

+ + + + +
AttributeDescription
logLevelLog Level for logged message. Possible values are trace, debug, info, warn, error.
messageMessage for log.
+
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html new file mode 100644 index 0000000000..4633f37a40 --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html @@ -0,0 +1,37 @@ + + + + + + RecordSinkHandler + + + + + +

Summary

+

+ The RecordSinkHandler is used to execute actions that send metrics information to a configured sink. RecordSinkHandler can be invoked with any Action object. + Action objects can include attributes to configure the handler. Possible attribute values are listed below. +

+

RecordSinkHandler Service Attributes

+ + + +
AttributeDescription
sendZeroResultsAllow empty results to be sent to sink. Possible values are true and false (default is false).
+
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java new file mode 100644 index 0000000000..5fda9de527 --- /dev/null +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.rules.handlers; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinFactory; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.rules.Action; +import org.apache.nifi.util.MockBulletinRepository; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyString; + +public class TestAlertHandler { + + private TestRunner runner; + private MockComponentLog mockComponentLog; + private ReportingContext reportingContext; + private AlertHandler alertHandler; + private MockAlertBulletinRepository mockAlertBulletinRepository; + + @Before + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(TestProcessor.class); + mockComponentLog = new MockComponentLog(); + AlertHandler handler = new MockAlertHandler(mockComponentLog); + mockAlertBulletinRepository = new MockAlertBulletinRepository(); + runner.addControllerService("MockAlertHandler", handler); + runner.enableControllerService(handler); + alertHandler = (AlertHandler) runner.getProcessContext() + .getControllerServiceLookup() + .getControllerService("MockAlertHandler"); + reportingContext = Mockito.mock(ReportingContext.class); + Mockito.when(reportingContext.getBulletinRepository()).thenReturn(mockAlertBulletinRepository); + Mockito.when(reportingContext.createBulletin(anyString(), Mockito.any(Severity.class), anyString())) + .thenAnswer(invocation -> + BulletinFactory.createBulletin(invocation.getArgument(0), invocation.getArgument(1).toString(), invocation.getArgument(2))); + } + + @Test + public void testValidService() { + runner.assertValid(alertHandler); + assertThat(alertHandler, instanceOf(AlertHandler.class)); + } + + @Test + public void testAlertNoReportingContext() { + + final Map attributes = new HashMap<>(); + final Map metrics = new HashMap<>(); + + attributes.put("logLevel", "INFO"); + attributes.put("message", "This should be not sent as an alert!"); + metrics.put("jvmHeap", "1000000"); + metrics.put("cpu", "90"); + + final Action action = new Action(); + action.setType("ALERT"); + action.setAttributes(attributes); + try { + alertHandler.execute(action, metrics); + fail(); + } catch (UnsupportedOperationException ex) { + assertTrue(true); + } + } + + @Test + public void testAlertWithBulletinLevel() { + + final Map attributes = new HashMap<>(); + final Map metrics = new HashMap<>(); + + final String category = "Rules Alert"; + final String message = "This should be sent as an alert!"; + final String severity = "INFO"; + attributes.put("category", category); + attributes.put("message", message); + attributes.put("severity", severity); + metrics.put("jvmHeap", "1000000"); + metrics.put("cpu", "90"); + + final String expectedOutput = "This should be sent as an alert!\n" + + "Alert Facts:\n" + + "Field: cpu, Value: 90\n" + + "Field: jvmHeap, Value: 1000000\n"; + + final Action action = new Action(); + action.setType("ALERT"); + action.setAttributes(attributes); + alertHandler.execute(reportingContext, action, metrics); + BulletinRepository bulletinRepository = reportingContext.getBulletinRepository(); + List bulletins = bulletinRepository.findBulletinsForController(); + assertFalse(bulletins.isEmpty()); + Bulletin bulletin = bulletins.get(0); + assertEquals(bulletin.getCategory(), category); + assertEquals(bulletin.getMessage(), expectedOutput); + assertEquals(bulletin.getLevel(), severity); + } + + @Test + public void testAlertWithDefaultValues() { + + final Map attributes = new HashMap<>(); + final Map metrics = new HashMap<>(); + + final String category = "Rules Triggered Alert"; + final String message = "An alert was triggered by a rules based action."; + final String severity = "INFO"; + metrics.put("jvmHeap", "1000000"); + metrics.put("cpu", "90"); + + final String expectedOutput = "An alert was triggered by a rules-based action.\n" + + "Alert Facts:\n" + + "Field: cpu, Value: 90\n" + + "Field: jvmHeap, Value: 1000000\n"; + + final Action action = new Action(); + action.setType("ALERT"); + action.setAttributes(attributes); + alertHandler.execute(reportingContext, action, metrics); + BulletinRepository bulletinRepository = reportingContext.getBulletinRepository(); + List bulletins = bulletinRepository.findBulletinsForController(); + assertFalse(bulletins.isEmpty()); + Bulletin bulletin = bulletins.get(0); + assertEquals(bulletin.getCategory(), category); + assertEquals(bulletin.getMessage(), expectedOutput); + assertEquals(bulletin.getLevel(), severity); + } + + @Test + public void testInvalidContext(){ + final Map attributes = new HashMap<>(); + final Map metrics = new HashMap<>(); + + final String category = "Rules Alert"; + final String message = "This should be sent as an alert!"; + final String severity = "INFO"; + attributes.put("category", category); + attributes.put("message", message); + attributes.put("severity", severity); + metrics.put("jvmHeap", "1000000"); + metrics.put("cpu", "90"); + + final Action action = new Action(); + action.setType("ALERT"); + action.setAttributes(attributes); + PropertyContext fakeContext = new PropertyContext() { + @Override + public PropertyValue getProperty(PropertyDescriptor descriptor) { + return null; + } + + @Override + public Map getAllProperties() { + return null; + } + }; + alertHandler.execute(fakeContext, action, metrics); + final String debugMessage = mockComponentLog.getWarnMessage(); + assertTrue(StringUtils.isNotEmpty(debugMessage)); + assertEquals(debugMessage,"Reporting context was not provided to create bulletins."); + } + + @Test + public void testEmptyBulletinRepository(){ + final Map attributes = new HashMap<>(); + final Map metrics = new HashMap<>(); + + final String category = "Rules Alert"; + final String message = "This should be sent as an alert!"; + final String severity = "INFO"; + attributes.put("category", category); + attributes.put("message", message); + attributes.put("severity", severity); + metrics.put("jvmHeap", "1000000"); + metrics.put("cpu", "90"); + + final Action action = new Action(); + action.setType("ALERT"); + action.setAttributes(attributes); + ReportingContext fakeContext = Mockito.mock(ReportingContext.class); + Mockito.when(reportingContext.getBulletinRepository()).thenReturn(null); + alertHandler.execute(fakeContext, action, metrics); + final String debugMessage = mockComponentLog.getWarnMessage(); + assertTrue(StringUtils.isNotEmpty(debugMessage)); + assertEquals(debugMessage,"Bulletin Repository is not available which is unusual. Cannot send a bulletin."); + } + + private static class MockAlertHandler extends AlertHandler { + + private ComponentLog testLogger; + + public MockAlertHandler(ComponentLog testLogger) { + this.testLogger = testLogger; + } + + @Override + protected ComponentLog getLogger() { + return testLogger; + } + + } + + private static class MockAlertBulletinRepository extends MockBulletinRepository { + + List bulletinList; + + + public MockAlertBulletinRepository() { + bulletinList = new ArrayList<>(); + } + + @Override + public void addBulletin(Bulletin bulletin) { + bulletinList.add(bulletin); + } + + @Override + public List findBulletinsForController() { + return bulletinList; + } + + } + +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml index 1b00ce7289..7cb3179718 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml @@ -81,6 +81,12 @@ 1.11.0-SNAPSHOT test + + org.apache.nifi + nifi-rules-engine-service-api + 1.11.0-SNAPSHOT + provided + diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java new file mode 100644 index 0000000000..5254abeae1 --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.sql; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.reporting.sql.util.QueryMetricsUtil; +import org.apache.nifi.rules.Action; +import org.apache.nifi.rules.PropertyContextActionHandler; +import org.apache.nifi.rules.engine.RulesEngineService; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Tags({"reporting", "rules", "action", "action handler", "status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "sql"}) +@CapabilityDescription("Triggers rules-driven actions based on metrics values ") +public class MetricsEventReportingTask extends AbstractReportingTask { + + private List properties; + private MetricsQueryService metricsQueryService; + private volatile RulesEngineService rulesEngineService; + private volatile PropertyContextActionHandler actionHandler; + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected void init(final ReportingInitializationContext config) { + metricsQueryService = new MetricsSqlQueryService(getLogger()); + final List properties = new ArrayList<>(); + properties.add(QueryMetricsUtil.QUERY); + properties.add(QueryMetricsUtil.RULES_ENGINE); + properties.add(QueryMetricsUtil.ACTION_HANDLER); + this.properties = Collections.unmodifiableList(properties); + } + + @OnScheduled + public void setup(final ConfigurationContext context) throws IOException { + actionHandler = context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class); + rulesEngineService = context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class); + } + + @Override + public void onTrigger(ReportingContext context) { + try { + final String query = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue(); + fireRules(context, actionHandler, rulesEngineService, query); + } catch (Exception e) { + getLogger().error("Error opening loading rules: {}", new Object[]{e.getMessage()}, e); + } + } + + private void fireRules(ReportingContext context, PropertyContextActionHandler actionHandler, RulesEngineService engine, String query) throws Exception { + QueryResult queryResult = metricsQueryService.query(context, query); + getLogger().debug("Executing query: {}", new Object[]{ query }); + ResultSetRecordSet recordSet = metricsQueryService.getResultSetRecordSet(queryResult); + Record record; + try { + while ((record = recordSet.next()) != null) { + final Map facts = new HashMap<>(); + for (String fieldName : record.getRawFieldNames()) { + facts.put(fieldName, record.getValue(fieldName)); + } + List actions = engine.fireRules(facts); + if(actions == null || actions.isEmpty()){ + getLogger().debug("No actions required for provided facts."); + } else { + actions.forEach(action -> { + actionHandler.execute(context, action,facts); + }); + } + } + } finally { + metricsQueryService.closeQuietly(recordSet); + } + } +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java index ae0e326e12..6f3aa9ef19 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java @@ -16,21 +16,16 @@ */ package org.apache.nifi.reporting.sql; -import org.apache.calcite.config.Lex; -import org.apache.calcite.sql.parser.SqlParser; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.record.sink.RecordSinkService; import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.reporting.sql.util.QueryMetricsUtil; import org.apache.nifi.serialization.record.ResultSetRecordSet; import org.apache.nifi.util.StopWatch; @@ -50,34 +45,6 @@ import java.util.concurrent.TimeUnit; + "query on the table when the capability is disabled will cause an error.") public class QueryNiFiReportingTask extends AbstractReportingTask { - static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder() - .name("sql-reporting-record-sink") - .displayName("Record Destination Service") - .description("Specifies the Controller Service to use for writing out the query result records to some destination.") - .identifiesControllerService(RecordSinkService.class) - .required(true) - .build(); - - static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() - .name("sql-reporting-query") - .displayName("SQL Query") - .description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. " - + "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables. Note that the " - + "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .addValidator(new SqlValidator()) - .build(); - - static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder() - .name("sql-reporting-include-zero-record-results") - .displayName("Include Zero Record Results") - .description("When running the SQL statement, if the result has no data, this property specifies whether or not the empty result set will be transmitted.") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); private List properties; @@ -89,9 +56,9 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { protected void init(final ReportingInitializationContext config) { metricsQueryService = new MetricsSqlQueryService(getLogger()); final List properties = new ArrayList<>(); - properties.add(QUERY); - properties.add(RECORD_SINK); - properties.add(INCLUDE_ZERO_RECORD_RESULTS); + properties.add(QueryMetricsUtil.QUERY); + properties.add(QueryMetricsUtil.RECORD_SINK); + properties.add(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS); this.properties = Collections.unmodifiableList(properties); } @@ -102,7 +69,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { @OnScheduled public void setup(final ConfigurationContext context) throws IOException { - recordSinkService = context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class); + recordSinkService = context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class); recordSinkService.reset(); } @@ -110,7 +77,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { public void onTrigger(ReportingContext context) { final StopWatch stopWatch = new StopWatch(true); try { - final String sql = context.getProperty(QUERY).evaluateAttributeExpressions().getValue(); + final String sql = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue(); final QueryResult queryResult = metricsQueryService.query(context, sql); final ResultSetRecordSet recordSet; @@ -129,7 +96,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { attributes.put("reporting.task.name", getName()); attributes.put("reporting.task.uuid", getIdentifier()); attributes.put("reporting.task.type", this.getClass().getSimpleName()); - recordSinkService.sendData(recordSet, attributes, context.getProperty(INCLUDE_ZERO_RECORD_RESULTS).asBoolean()); + recordSinkService.sendData(recordSet, attributes, context.getProperty(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS).asBoolean()); } catch (Exception e) { getLogger().error("Error during transmission of query results due to {}", new Object[]{e.getMessage()}, e); return; @@ -143,41 +110,4 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { } } - private static class SqlValidator implements Validator { - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if (context.isExpressionLanguagePresent(input)) { - return new ValidationResult.Builder() - .input(input) - .subject(subject) - .valid(true) - .explanation("Expression Language Present") - .build(); - } - - final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); - - final SqlParser.Config config = SqlParser.configBuilder() - .setLex(Lex.MYSQL_ANSI) - .build(); - - final SqlParser parser = SqlParser.create(substituted, config); - try { - parser.parseStmt(); - return new ValidationResult.Builder() - .subject(subject) - .input(input) - .valid(true) - .build(); - } catch (final Exception e) { - return new ValidationResult.Builder() - .subject(subject) - .input(input) - .valid(false) - .explanation("Not a valid SQL Statement: " + e.getMessage()) - .build(); - } - } - } - } diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java new file mode 100644 index 0000000000..159daecd88 --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.sql.util; + +import org.apache.calcite.config.Lex; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.record.sink.RecordSinkService; +import org.apache.nifi.rules.PropertyContextActionHandler; +import org.apache.nifi.rules.engine.RulesEngineService; + +public class QueryMetricsUtil { + + public static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder() + .name("sql-reporting-record-sink") + .displayName("Record Destination Service") + .description("Specifies the Controller Service to use for writing out the query result records to some destination.") + .identifiesControllerService(RecordSinkService.class) + .required(true) + .build(); + + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("sql-reporting-query") + .displayName("SQL Query") + .description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. " + + "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables. Note that the " + + "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(new SqlValidator()) + .build(); + + public static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder() + .name("sql-reporting-include-zero-record-results") + .displayName("Include Zero Record Results") + .description("When running the SQL statement, if the result has no data, this property specifies whether or not the empty result set will be transmitted.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor RULES_ENGINE = new PropertyDescriptor.Builder() + .name("rules-engine-service") + .displayName("Rules Engine Service") + .description("Specifies the Controller Service to use for applying rules to metrics.") + .identifiesControllerService(RulesEngineService.class) + .required(true) + .build(); + + public static final PropertyDescriptor ACTION_HANDLER = new PropertyDescriptor.Builder() + .name("action-handler") + .displayName("Event Action Handler") + .description("Handler that will execute the defined action returned from rules engine (if Action type is supported by the handler)") + .identifiesControllerService(PropertyContextActionHandler.class) + .required(true) + .build(); + + public static class SqlValidator implements Validator { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + if (context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .explanation("Expression Language Present") + .build(); + } + + final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); + + final SqlParser.Config config = SqlParser.configBuilder() + .setLex(Lex.MYSQL_ANSI) + .build(); + + final SqlParser parser = SqlParser.create(substituted, config); + try { + parser.parseStmt(); + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(true) + .build(); + } catch (final Exception e) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Not a valid SQL Statement: " + e.getMessage()) + .build(); + } + } + } + +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask index ec340e8139..c3f5883710 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.reporting.sql.QueryNiFiReportingTask \ No newline at end of file +org.apache.nifi.reporting.sql.QueryNiFiReportingTask +org.apache.nifi.reporting.sql.MetricsEventReportingTask \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html new file mode 100644 index 0000000000..2392aab231 --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html @@ -0,0 +1,34 @@ + + + + + + Metrics Event Reporting Task + + + + + +

Summary

+

+ This reporting task can be used to issue SQL queries against various NiFi metrics information, submit returned data to a rules engine (which will determine if any actions should be performed) + and execute the prescribed actions using action handlers. This task requires a RulesEngineService (which will identify any actions that should be performed) and an ActionHandler which will execute the action(s). + A distinct ActionHandler can be used to service all events or an ActionHandlerLookup can be used for dynamic handler lookup. NOTE: Optimally action handler should be associated with the expected action types + returned from the rules engine. +

+
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java new file mode 100644 index 0000000000..83992e497c --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.sql; + +import com.google.common.collect.Lists; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.reporting.sql.util.QueryMetricsUtil; +import org.apache.nifi.rules.Action; +import org.apache.nifi.rules.MockPropertyContextActionHandler; +import org.apache.nifi.rules.PropertyContextActionHandler; +import org.apache.nifi.rules.engine.MockRulesEngineService; +import org.apache.nifi.rules.engine.RulesEngineService; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockPropertyValue; +import org.apache.nifi.util.Tuple; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; + +public class TestMetricsEventReportingTask { + private ReportingContext context; + private MockMetricsEventReportingTask reportingTask; + private MockPropertyContextActionHandler actionHandler; + private MockRulesEngineService rulesEngineService; + private ProcessGroupStatus status; + + @Before + public void setup() { + status = new ProcessGroupStatus(); + actionHandler = new MockPropertyContextActionHandler(); + status.setId("1234"); + status.setFlowFilesReceived(5); + status.setBytesReceived(10000); + status.setFlowFilesSent(10); + status.setBytesRead(20000L); + status.setBytesSent(20000); + status.setQueuedCount(100); + status.setQueuedContentSize(1024L); + status.setBytesWritten(80000L); + status.setActiveThreadCount(5); + + // create a processor status with processing time + ProcessorStatus procStatus = new ProcessorStatus(); + procStatus.setId("proc"); + procStatus.setProcessingNanos(123456789); + + Collection processorStatuses = new ArrayList<>(); + processorStatuses.add(procStatus); + status.setProcessorStatus(processorStatuses); + + ConnectionStatusPredictions connectionStatusPredictions = new ConnectionStatusPredictions(); + connectionStatusPredictions.setPredictedTimeToCountBackpressureMillis(1000); + connectionStatusPredictions.setPredictedTimeToBytesBackpressureMillis(1000); + connectionStatusPredictions.setNextPredictedQueuedCount(1000000000); + connectionStatusPredictions.setNextPredictedQueuedBytes(1000000000000000L); + + ConnectionStatus root1ConnectionStatus = new ConnectionStatus(); + root1ConnectionStatus.setId("root1"); + root1ConnectionStatus.setQueuedCount(1000); + root1ConnectionStatus.setPredictions(connectionStatusPredictions); + + ConnectionStatus root2ConnectionStatus = new ConnectionStatus(); + root2ConnectionStatus.setId("root2"); + root2ConnectionStatus.setQueuedCount(500); + root2ConnectionStatus.setPredictions(connectionStatusPredictions); + + Collection rootConnectionStatuses = new ArrayList<>(); + rootConnectionStatuses.add(root1ConnectionStatus); + rootConnectionStatuses.add(root2ConnectionStatus); + status.setConnectionStatus(rootConnectionStatuses); + + // create a group status with processing time + ProcessGroupStatus groupStatus1 = new ProcessGroupStatus(); + groupStatus1.setProcessorStatus(processorStatuses); + groupStatus1.setBytesRead(1234L); + + // Create a nested group status with a connection + ProcessGroupStatus groupStatus2 = new ProcessGroupStatus(); + groupStatus2.setProcessorStatus(processorStatuses); + groupStatus2.setBytesRead(12345L); + ConnectionStatus nestedConnectionStatus = new ConnectionStatus(); + nestedConnectionStatus.setId("nested"); + nestedConnectionStatus.setQueuedCount(1001); + Collection nestedConnectionStatuses = new ArrayList<>(); + nestedConnectionStatuses.add(nestedConnectionStatus); + groupStatus2.setConnectionStatus(nestedConnectionStatuses); + Collection nestedGroupStatuses = new ArrayList<>(); + nestedGroupStatuses.add(groupStatus2); + groupStatus1.setProcessGroupStatus(nestedGroupStatuses); + + ProcessGroupStatus groupStatus3 = new ProcessGroupStatus(); + groupStatus3.setBytesRead(1L); + ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus(); + nestedConnectionStatus2.setId("nested2"); + nestedConnectionStatus2.setQueuedCount(3); + Collection nestedConnectionStatuses2 = new ArrayList<>(); + nestedConnectionStatuses2.add(nestedConnectionStatus2); + groupStatus3.setConnectionStatus(nestedConnectionStatuses2); + Collection nestedGroupStatuses2 = new ArrayList<>(); + nestedGroupStatuses2.add(groupStatus3); + + Collection groupStatuses = new ArrayList<>(); + groupStatuses.add(groupStatus1); + groupStatuses.add(groupStatus3); + status.setProcessGroupStatus(groupStatuses); + + } + + @Test + public void testConnectionStatusTable() throws IOException, InitializationException { + final Map properties = new HashMap<>(); + properties.put(QueryMetricsUtil.QUERY, "select connectionId, predictedQueuedCount, predictedTimeToBytesBackpressureMillis from CONNECTION_STATUS_PREDICTIONS"); + reportingTask = initTask(properties); + reportingTask.onTrigger(context); + List> metricsList = actionHandler.getRows(); + List> defaultLogActions = actionHandler.getDefaultActionsByType("LOG"); + List> defaultAlertActions = actionHandler.getDefaultActionsByType("ALERT"); + List propertyContexts = actionHandler.getPropertyContexts(); + assertFalse(metricsList.isEmpty()); + assertEquals(2,defaultLogActions.size()); + assertEquals(2,defaultAlertActions.size()); + assertEquals(4,propertyContexts.size()); + + } + + private MockMetricsEventReportingTask initTask(Map customProperties) throws InitializationException, IOException { + + final ComponentLog logger = Mockito.mock(ComponentLog.class); + final BulletinRepository bulletinRepository = Mockito.mock(BulletinRepository.class); + reportingTask = new MockMetricsEventReportingTask(); + final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); + Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + Mockito.when(initContext.getLogger()).thenReturn(logger); + reportingTask.initialize(initContext); + Map properties = new HashMap<>(); + + for (final PropertyDescriptor descriptor : reportingTask.getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.putAll(customProperties); + + context = Mockito.mock(ReportingContext.class); + Mockito.when(context.isAnalyticsEnabled()).thenReturn(true); + Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(reportingTask)); + Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository); + Mockito.when(context.createBulletin(anyString(),any(Severity.class), anyString())).thenReturn(null); + + Mockito.doAnswer((Answer) invocation -> { + final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor)); + }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); + + final EventAccess eventAccess = Mockito.mock(EventAccess.class); + Mockito.when(context.getEventAccess()).thenReturn(eventAccess); + Mockito.when(eventAccess.getControllerStatus()).thenReturn(status); + + final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); + actionHandler = new MockPropertyContextActionHandler(); + Mockito.when(pValue.asControllerService(PropertyContextActionHandler.class)).thenReturn(actionHandler); + + Action action1 = new Action(); + action1.setType("LOG"); + Action action2 = new Action(); + action2.setType("ALERT"); + + final PropertyValue resValue = Mockito.mock(StandardPropertyValue.class); + rulesEngineService = new MockRulesEngineService(Lists.newArrayList(action1,action2)); + Mockito.when(resValue.asControllerService(RulesEngineService.class)).thenReturn(rulesEngineService); + + ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class); + Mockito.when(configContext.getProperty(QueryMetricsUtil.RULES_ENGINE)).thenReturn(resValue); + Mockito.when(configContext.getProperty(QueryMetricsUtil.ACTION_HANDLER)).thenReturn(pValue); + reportingTask.setup(configContext); + + return reportingTask; + } + + private static final class MockMetricsEventReportingTask extends MetricsEventReportingTask { + + } +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java index 9f4cb0a0ac..eae9e9183e 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java @@ -31,6 +31,7 @@ import org.apache.nifi.reporting.EventAccess; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.reporting.sql.util.QueryMetricsUtil; import org.apache.nifi.reporting.util.metrics.MetricNames; import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockPropertyValue; @@ -138,8 +139,8 @@ public class TestQueryNiFiReportingTask { @Test public void testConnectionStatusTable() throws IOException, InitializationException { final Map properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc"); + properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); + properties.put(QueryMetricsUtil.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); @@ -174,8 +175,8 @@ public class TestQueryNiFiReportingTask { @Test public void testJvmMetricsTable() throws IOException, InitializationException { final Map properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select " + properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); + properties.put(QueryMetricsUtil.QUERY, "select " + Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT, MetricNames.JVM_THREAD_COUNT, MetricNames.JVM_THREAD_STATES_BLOCKED, @@ -202,8 +203,8 @@ public class TestQueryNiFiReportingTask { @Test public void testProcessGroupStatusTable() throws IOException, InitializationException { final Map properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc"); + properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); + properties.put(QueryMetricsUtil.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc"); reportingTask = initTask(properties); reportingTask.onTrigger(context); @@ -227,8 +228,8 @@ public class TestQueryNiFiReportingTask { @Test public void testNoResults() throws IOException, InitializationException { final Map properties = new HashMap<>(); - properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); - properties.put(QueryNiFiReportingTask.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000"); + properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); + properties.put(QueryMetricsUtil.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000"); reportingTask = initTask(properties); reportingTask.onTrigger(context); @@ -263,11 +264,11 @@ public class TestQueryNiFiReportingTask { final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); mockRecordSinkService = new MockRecordSinkService(); - Mockito.when(context.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue); + Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue); Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService); ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class); - Mockito.when(configContext.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue); + Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue); reportingTask.setup(configContext); return reportingTask; diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java new file mode 100644 index 0000000000..323317d0b2 --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.rules; + +import org.apache.nifi.components.AbstractConfigurableComponent; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.Tuple; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class MockPropertyContextActionHandler extends AbstractConfigurableComponent implements PropertyContextActionHandler{ + + private List> rows = new ArrayList<>(); + private List> defaultActions = new ArrayList<>(); + private List propertyContexts = new ArrayList<>(); + + + @Override + public void execute(PropertyContext context, Action action, Map facts) { + propertyContexts.add(context); + execute(action, facts); + } + + @Override + public void execute(Action action, Map facts) { + rows.add(facts); + defaultActions.add( new Tuple<>(action.getType(),action)); + } + + + @Override + public void initialize(ControllerServiceInitializationContext context) throws InitializationException { + + } + + public List> getRows() { + return rows; + } + + public List> getDefaultActions() { + return defaultActions; + } + + public List> getDefaultActionsByType(final String type){ + return defaultActions.stream().filter(stringActionTuple -> stringActionTuple + .getKey().equalsIgnoreCase(type)).collect(Collectors.toList()); + } + + public List getPropertyContexts() { + return propertyContexts; + } + + @Override + public String getIdentifier() { + return "MockPropertyContextActionHandler"; + } +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java new file mode 100644 index 0000000000..e3ccc73764 --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.rules.engine; + +import org.apache.nifi.components.AbstractConfigurableComponent; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.rules.Action; + +import java.util.List; +import java.util.Map; + +public class MockRulesEngineService extends AbstractConfigurableComponent implements RulesEngineService { + private List actions; + + public MockRulesEngineService(List actions) { + this.actions = actions; + } + + @Override + public List fireRules(Map facts) { + return actions; + } + + @Override + public void initialize(ControllerServiceInitializationContext context) throws InitializationException { + } + + @Override + public String getIdentifier() { + return "MockRulesEngineService"; + } +}