NIFI-6842 - Introduce MetricsEventReportingTask

NIFI-6842 - Added AlertHandler for bulletin reporting. Update ReportingTask meta data.

NIFI-6842 - corrected display names in action handlers, included metrics option for alert handlers, small refactor in reporting task

NIFI-6842 - updated docs and tags

NIFI-6842 - Added documentation for handlers.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3874
This commit is contained in:
Yolanda M. Davis 2019-11-05 14:25:58 -05:00 committed by Matthew Burgess
parent eb366c8d0a
commit c998a7259a
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
21 changed files with 1257 additions and 101 deletions

View File

@ -878,6 +878,18 @@ language governing permissions and limitations under the License. -->
<version>1.11.0-SNAPSHOT</version> <version>1.11.0-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-easyrules-nar</artifactId>
<version>1.11.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-rules-action-handler-nar</artifactId>
<version>1.11.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies> </dependencies>
</profile> </profile>
<profile> <profile>

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.rules.handlers;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.rules.Action;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({"rules", "rules engine", "action", "action handler", "logging", "alerts", "bulletins"})
@CapabilityDescription("Creates alerts as bulletins based on a provided action (usually created by a rules engine). " +
"Action objects executed with this Handler should contain \"category\", \"message\", and \"logLevel\" attributes.")
public class AlertHandler extends AbstractActionHandlerService {
public static final PropertyDescriptor DEFAULT_LOG_LEVEL = new PropertyDescriptor.Builder()
.name("alert-default-log-level")
.displayName("Default Alert Log Level")
.required(true)
.description("The default Log Level that will be used to log an alert message" +
" if a log level was not provided in the received action's attributes.")
.allowableValues(DebugLevels.values())
.defaultValue("info")
.build();
public static final PropertyDescriptor DEFAULT_CATEGORY = new PropertyDescriptor.Builder()
.name("alert-default-category")
.displayName("Default Category")
.required(true)
.description("The default category to use when logging alert message "+
" if a category was not provided in the received action's attributes.")
.defaultValue("Rules Triggered Alert")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DEFAULT_MESSAGE = new PropertyDescriptor.Builder()
.name("alert-default-message")
.displayName("Default Message")
.required(true)
.description("The default message to include in alert if an alert message was " +
"not provided in the received action's attributes")
.defaultValue("An alert was triggered by a rules-based action.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private static final PropertyDescriptor INCLUDE_FACTS = new PropertyDescriptor.Builder()
.name("alert-include-facts")
.displayName("Include Fact Data")
.required(true)
.description("If true, the alert message will include the facts which triggered this action. Default is false.")
.defaultValue("true")
.allowableValues("true", "false")
.build();
private List<PropertyDescriptor> properties;
private String defaultCategory;
private String defaultLogLevel;
private String defaultMessage;
private Boolean includeFacts;
@Override
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
super.init(config);
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DEFAULT_LOG_LEVEL);
properties.add(DEFAULT_CATEGORY);
properties.add(DEFAULT_MESSAGE);
properties.add(INCLUDE_FACTS);
this.properties = Collections.unmodifiableList(properties);
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase();
defaultCategory = context.getProperty(DEFAULT_CATEGORY).getValue();
defaultMessage = context.getProperty(DEFAULT_MESSAGE).getValue();
includeFacts = context.getProperty(INCLUDE_FACTS).asBoolean();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void execute(Action action, Map<String, Object> facts) {
throw new UnsupportedOperationException("This method is not supported. The AlertHandler requires a Reporting Context");
}
@Override
public void execute(PropertyContext propertyContext, Action action, Map<String, Object> facts) {
ComponentLog logger = getLogger();
if (propertyContext instanceof ReportingContext) {
ReportingContext context = (ReportingContext) propertyContext;
Map<String, String> attributes = action.getAttributes();
if (context.getBulletinRepository() != null) {
final String category = attributes.getOrDefault("category", defaultCategory);
final String message = getMessage(attributes.getOrDefault("message", defaultMessage), facts);
final String level = attributes.getOrDefault("severity", attributes.getOrDefault("logLevel", defaultLogLevel));
Severity severity;
try {
severity = Severity.valueOf(level.toUpperCase());
} catch (IllegalArgumentException iae) {
severity = Severity.INFO;
}
BulletinRepository bulletinRepository = context.getBulletinRepository();
bulletinRepository.addBulletin(context.createBulletin(category, severity, message));
} else {
logger.warn("Bulletin Repository is not available which is unusual. Cannot send a bulletin.");
}
} else {
logger.warn("Reporting context was not provided to create bulletins.");
}
}
protected String getMessage(String alertMessage, Map<String, Object> facts){
if (includeFacts) {
final StringBuilder message = new StringBuilder(alertMessage);
final Set<String> fields = facts.keySet();
message.append("\n");
message.append("Alert Facts:\n");
fields.forEach(field -> {
message.append("Field: ");
message.append(field);
message.append(", Value: ");
message.append(facts.get(field));
message.append("\n");
});
return message.toString();
}else{
return alertMessage;
}
}
}

View File

@ -39,7 +39,7 @@ import java.util.Map;
@Tags({"rules", "rules engine", "action", "action handler", "expression language","MVEL","SpEL"}) @Tags({"rules", "rules engine", "action", "action handler", "expression language","MVEL","SpEL"})
@CapabilityDescription("Executes an action containing an expression written in MVEL or SpEL. The action " + @CapabilityDescription("Executes an action containing an expression written in MVEL or SpEL. The action " +
"is usually created by a rules engine. ") "is usually created by a rules engine. Action objects executed with this Handler should contain \"command\" and \"type\" attributes.")
public class ExpressionHandler extends AbstractActionHandlerService { public class ExpressionHandler extends AbstractActionHandlerService {
enum ExpresssionType { enum ExpresssionType {
@ -47,9 +47,11 @@ public class ExpressionHandler extends AbstractActionHandlerService {
} }
public static final PropertyDescriptor DEFAULT_EXPRESSION_LANGUAGE_TYPE = new PropertyDescriptor.Builder() public static final PropertyDescriptor DEFAULT_EXPRESSION_LANGUAGE_TYPE = new PropertyDescriptor.Builder()
.name("Expression Language Type") .name("default-expression-language-type")
.displayName("Default Expression Language Type")
.required(true) .required(true)
.description("The expression language that should be used to compile and execute action. Supported languages are MVEL and Spring Expression Language (SpEL).") .description("If an expression language type is not provided as an attribute within an Action, the default expression language that " +
"should be used to compile and execute action. Supported languages are MVEL and Spring Expression Language (SpEL).")
.allowableValues(ExpresssionType.values()) .allowableValues(ExpresssionType.values())
.defaultValue("MVEL") .defaultValue("MVEL")
.build(); .build();
@ -82,7 +84,7 @@ public class ExpressionHandler extends AbstractActionHandlerService {
final String command = attributes.get("command"); final String command = attributes.get("command");
if(StringUtils.isNotEmpty(command)) { if(StringUtils.isNotEmpty(command)) {
try { try {
final String type = attributes.get("type"); final String type = attributes.getOrDefault("type",this.type.toString());
ExpresssionType expresssionType = ExpresssionType.valueOf(type); ExpresssionType expresssionType = ExpresssionType.valueOf(type);
if (expresssionType.equals(ExpresssionType.MVEL)) { if (expresssionType.equals(ExpresssionType.MVEL)) {
executeMVEL(command, facts); executeMVEL(command, facts);

View File

@ -38,19 +38,32 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
@Tags({"rules", "rules engine", "action", "action handler", "logging"}) @Tags({"rules", "rules engine", "action", "action handler", "logging"})
@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine)") @CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine). " +
" Action objects executed with this Handler should contain \"logLevel\" and \"message\" attributes.")
public class LogHandler extends AbstractActionHandlerService { public class LogHandler extends AbstractActionHandlerService {
public static final PropertyDescriptor DEFAULT_LOG_LEVEL = new PropertyDescriptor.Builder() public static final PropertyDescriptor DEFAULT_LOG_LEVEL = new PropertyDescriptor.Builder()
.name("Log Level") .name("logger-default-log-level")
.displayName("Default Log Level")
.required(true) .required(true)
.description("The Log Level to use when logging the Attributes") .description("If a log level is not provided as an attribute within an Action, the default log level will be used.")
.allowableValues(DebugLevels.values()) .allowableValues(DebugLevels.values())
.defaultValue("info") .defaultValue("info")
.build(); .build();
public static final PropertyDescriptor DEFAULT_LOG_MESSAGE = new PropertyDescriptor.Builder()
.name("logger-default-log-message")
.displayName("Default Log Message")
.required(true)
.description("If a log message is not provided as an attribute within an Action, the default log message will be used.")
.defaultValue("Rules Action Triggered Log.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
private static final PropertyDescriptor LOG_FACTS = new PropertyDescriptor.Builder() private static final PropertyDescriptor LOG_FACTS = new PropertyDescriptor.Builder()
.name("Log Facts") .name("log-facts")
.displayName("Log Facts")
.required(true) .required(true)
.description("If true, the log message will include the facts which triggered this log action.") .description("If true, the log message will include the facts which triggered this log action.")
.defaultValue("true") .defaultValue("true")
@ -58,7 +71,8 @@ public class LogHandler extends AbstractActionHandlerService {
.build(); .build();
private static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder() private static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder()
.name("Log prefix") .name("log-prefix")
.displayName("Log Prefix")
.required(false) .required(false)
.description("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.") .description("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -69,6 +83,7 @@ public class LogHandler extends AbstractActionHandlerService {
private String logPrefix; private String logPrefix;
private Boolean logFacts; private Boolean logFacts;
private String defaultLogLevel; private String defaultLogLevel;
private String defaultLogMessage;
@Override @Override
protected void init(ControllerServiceInitializationContext config) throws InitializationException { protected void init(ControllerServiceInitializationContext config) throws InitializationException {
@ -77,6 +92,7 @@ public class LogHandler extends AbstractActionHandlerService {
properties.add(LOG_PREFIX); properties.add(LOG_PREFIX);
properties.add(LOG_FACTS); properties.add(LOG_FACTS);
properties.add(DEFAULT_LOG_LEVEL); properties.add(DEFAULT_LOG_LEVEL);
properties.add(DEFAULT_LOG_MESSAGE);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
} }
@ -85,6 +101,7 @@ public class LogHandler extends AbstractActionHandlerService {
logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions().getValue(); logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions().getValue();
logFacts = context.getProperty(LOG_FACTS).asBoolean(); logFacts = context.getProperty(LOG_FACTS).asBoolean();
defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase(); defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase();
defaultLogMessage = context.getProperty(DEFAULT_LOG_MESSAGE).evaluateAttributeExpressions().getValue();
} }
@Override @Override
@ -98,7 +115,7 @@ public class LogHandler extends AbstractActionHandlerService {
Map<String, String> attributes = action.getAttributes(); Map<String, String> attributes = action.getAttributes();
final String logLevel = attributes.get("logLevel"); final String logLevel = attributes.get("logLevel");
final LogLevel level = getLogLevel(logLevel, LogLevel.valueOf(defaultLogLevel)); final LogLevel level = getLogLevel(logLevel, LogLevel.valueOf(defaultLogLevel));
final String eventMessage = StringUtils.isNotEmpty(attributes.get("message")) ? attributes.get("message") : "Rules Action Triggered Log."; final String eventMessage = StringUtils.isNotEmpty(attributes.get("message")) ? attributes.get("message") : defaultLogMessage;
final String factsMessage = createFactsLogMessage(facts, eventMessage); final String factsMessage = createFactsLogMessage(facts, eventMessage);
logger.log(level, factsMessage); logger.log(level, factsMessage);
} }

View File

@ -43,8 +43,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Tags({"rules", "rules engine", "action", "action handler", "logging"}) @Tags({"rules", "rules engine", "action", "action handler", "record", "record sink"})
@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine)") @CapabilityDescription("Sends fact information to sink based on a provided action (usually created by a rules engine)." +
" Action objects executed with this Handler should contain \"sendZeroResult\" attribute.")
public class RecordSinkHandler extends AbstractActionHandlerService{ public class RecordSinkHandler extends AbstractActionHandlerService{
static final PropertyDescriptor RECORD_SINK_SERVICE = new PropertyDescriptor.Builder() static final PropertyDescriptor RECORD_SINK_SERVICE = new PropertyDescriptor.Builder()

View File

@ -16,3 +16,4 @@ org.apache.nifi.rules.handlers.ActionHandlerLookup
org.apache.nifi.rules.handlers.ExpressionHandler org.apache.nifi.rules.handlers.ExpressionHandler
org.apache.nifi.rules.handlers.LogHandler org.apache.nifi.rules.handlers.LogHandler
org.apache.nifi.rules.handlers.RecordSinkHandler org.apache.nifi.rules.handlers.RecordSinkHandler
org.apache.nifi.rules.handlers.AlertHandler

View File

@ -0,0 +1,39 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>AlertHandler</title>
<!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Summary</h2>
<p>
The AlertHandler is used to broadcast alerts (bulletins) as dictated by the action object. Action objects can include attributes to configure
the handler otherwise default values will be used. Possible attribute values are listed below.
</p>
<h3>ExpressionHandler Service Attributes</h3>
<table title="AlertHandler Attributes" border="1" width="500">
<tr><th>Attribute</th><th>Description</th></tr>
<tr><td>category</td><td>The category the alert should be grouped under.</td></tr>
<tr><td>logLevel</td><td>Log Level for the alert. Possible values are trace, debug, info, warn, error.</td></tr>
<tr><td>message</td><td>Message for the alert.</td></tr>
</table>
<br/>
</body>
</html>

View File

@ -0,0 +1,38 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>ExpressionHandler</title>
<!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Summary</h2>
<p>
The ExpressionHandler is used to execute dynamic commands writtin in MVEL or SpEL expression language. Action objects must include attributes to configure
the handler otherwise an exception will be thrown. Possible attribute values are listed below.
</p>
<h3>ExpressionHandler Service Attributes</h3>
<table title="ExpressionHandler Attributes" border="1" width="500">
<tr><th>Attribute</th><th>Description</th></tr>
<tr><td>type</td><td>The expression language type of the command to be executed. Possible values are MVEL and SpEl (MVEL will be applied by default if type is not provided).</td></tr>
<tr><td>command</td><td>The expression language command that should be executed</td></tr>
</table>
<br/>
</body>
</html>

View File

@ -0,0 +1,38 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>LogHandler</title>
<!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Summary</h2>
<p>
The LogHandler is used to execute actions that dictate to log a message and/or metrics. LogHandler can be invoked with any Action object.
Action objects can include attributes to configure the LogHandler or rely on the handler's default settings. Possible attribute values are listed below.
</p>
<h3>LogHandler Service Attributes</h3>
<table title="LogHandler Attributes" border="1" width="500">
<tr><th>Attribute</th><th>Description</th></tr>
<tr><td>logLevel</td><td>Log Level for logged message. Possible values are trace, debug, info, warn, error.</td></tr>
<tr><td>message</td><td>Message for log.</td></tr>
</table>
<br/>
</body>
</html>

View File

@ -0,0 +1,37 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>RecordSinkHandler</title>
<!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Summary</h2>
<p>
The RecordSinkHandler is used to execute actions that send metrics information to a configured sink. RecordSinkHandler can be invoked with any Action object.
Action objects can include attributes to configure the handler. Possible attribute values are listed below.
</p>
<h3>RecordSinkHandler Service Attributes</h3>
<table title="RecordSinkHandler Attributes" border="1" width="500">
<tr><th>Attribute</th><th>Description</th></tr>
<tr><td>sendZeroResults</td><td>Allow empty results to be sent to sink. Possible values are true and false (default is false).</td></tr>
</table>
<br/>
</body>
</html>

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.rules.handlers;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinFactory;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.rules.Action;
import org.apache.nifi.util.MockBulletinRepository;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertTrue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyString;
public class TestAlertHandler {
private TestRunner runner;
private MockComponentLog mockComponentLog;
private ReportingContext reportingContext;
private AlertHandler alertHandler;
private MockAlertBulletinRepository mockAlertBulletinRepository;
@Before
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(TestProcessor.class);
mockComponentLog = new MockComponentLog();
AlertHandler handler = new MockAlertHandler(mockComponentLog);
mockAlertBulletinRepository = new MockAlertBulletinRepository();
runner.addControllerService("MockAlertHandler", handler);
runner.enableControllerService(handler);
alertHandler = (AlertHandler) runner.getProcessContext()
.getControllerServiceLookup()
.getControllerService("MockAlertHandler");
reportingContext = Mockito.mock(ReportingContext.class);
Mockito.when(reportingContext.getBulletinRepository()).thenReturn(mockAlertBulletinRepository);
Mockito.when(reportingContext.createBulletin(anyString(), Mockito.any(Severity.class), anyString()))
.thenAnswer(invocation ->
BulletinFactory.createBulletin(invocation.getArgument(0), invocation.getArgument(1).toString(), invocation.getArgument(2)));
}
@Test
public void testValidService() {
runner.assertValid(alertHandler);
assertThat(alertHandler, instanceOf(AlertHandler.class));
}
@Test
public void testAlertNoReportingContext() {
final Map<String, String> attributes = new HashMap<>();
final Map<String, Object> metrics = new HashMap<>();
attributes.put("logLevel", "INFO");
attributes.put("message", "This should be not sent as an alert!");
metrics.put("jvmHeap", "1000000");
metrics.put("cpu", "90");
final Action action = new Action();
action.setType("ALERT");
action.setAttributes(attributes);
try {
alertHandler.execute(action, metrics);
fail();
} catch (UnsupportedOperationException ex) {
assertTrue(true);
}
}
@Test
public void testAlertWithBulletinLevel() {
final Map<String, String> attributes = new HashMap<>();
final Map<String, Object> metrics = new HashMap<>();
final String category = "Rules Alert";
final String message = "This should be sent as an alert!";
final String severity = "INFO";
attributes.put("category", category);
attributes.put("message", message);
attributes.put("severity", severity);
metrics.put("jvmHeap", "1000000");
metrics.put("cpu", "90");
final String expectedOutput = "This should be sent as an alert!\n" +
"Alert Facts:\n" +
"Field: cpu, Value: 90\n" +
"Field: jvmHeap, Value: 1000000\n";
final Action action = new Action();
action.setType("ALERT");
action.setAttributes(attributes);
alertHandler.execute(reportingContext, action, metrics);
BulletinRepository bulletinRepository = reportingContext.getBulletinRepository();
List<Bulletin> bulletins = bulletinRepository.findBulletinsForController();
assertFalse(bulletins.isEmpty());
Bulletin bulletin = bulletins.get(0);
assertEquals(bulletin.getCategory(), category);
assertEquals(bulletin.getMessage(), expectedOutput);
assertEquals(bulletin.getLevel(), severity);
}
@Test
public void testAlertWithDefaultValues() {
final Map<String, String> attributes = new HashMap<>();
final Map<String, Object> metrics = new HashMap<>();
final String category = "Rules Triggered Alert";
final String message = "An alert was triggered by a rules based action.";
final String severity = "INFO";
metrics.put("jvmHeap", "1000000");
metrics.put("cpu", "90");
final String expectedOutput = "An alert was triggered by a rules-based action.\n" +
"Alert Facts:\n" +
"Field: cpu, Value: 90\n" +
"Field: jvmHeap, Value: 1000000\n";
final Action action = new Action();
action.setType("ALERT");
action.setAttributes(attributes);
alertHandler.execute(reportingContext, action, metrics);
BulletinRepository bulletinRepository = reportingContext.getBulletinRepository();
List<Bulletin> bulletins = bulletinRepository.findBulletinsForController();
assertFalse(bulletins.isEmpty());
Bulletin bulletin = bulletins.get(0);
assertEquals(bulletin.getCategory(), category);
assertEquals(bulletin.getMessage(), expectedOutput);
assertEquals(bulletin.getLevel(), severity);
}
@Test
public void testInvalidContext(){
final Map<String, String> attributes = new HashMap<>();
final Map<String, Object> metrics = new HashMap<>();
final String category = "Rules Alert";
final String message = "This should be sent as an alert!";
final String severity = "INFO";
attributes.put("category", category);
attributes.put("message", message);
attributes.put("severity", severity);
metrics.put("jvmHeap", "1000000");
metrics.put("cpu", "90");
final Action action = new Action();
action.setType("ALERT");
action.setAttributes(attributes);
PropertyContext fakeContext = new PropertyContext() {
@Override
public PropertyValue getProperty(PropertyDescriptor descriptor) {
return null;
}
@Override
public Map<String, String> getAllProperties() {
return null;
}
};
alertHandler.execute(fakeContext, action, metrics);
final String debugMessage = mockComponentLog.getWarnMessage();
assertTrue(StringUtils.isNotEmpty(debugMessage));
assertEquals(debugMessage,"Reporting context was not provided to create bulletins.");
}
@Test
public void testEmptyBulletinRepository(){
final Map<String, String> attributes = new HashMap<>();
final Map<String, Object> metrics = new HashMap<>();
final String category = "Rules Alert";
final String message = "This should be sent as an alert!";
final String severity = "INFO";
attributes.put("category", category);
attributes.put("message", message);
attributes.put("severity", severity);
metrics.put("jvmHeap", "1000000");
metrics.put("cpu", "90");
final Action action = new Action();
action.setType("ALERT");
action.setAttributes(attributes);
ReportingContext fakeContext = Mockito.mock(ReportingContext.class);
Mockito.when(reportingContext.getBulletinRepository()).thenReturn(null);
alertHandler.execute(fakeContext, action, metrics);
final String debugMessage = mockComponentLog.getWarnMessage();
assertTrue(StringUtils.isNotEmpty(debugMessage));
assertEquals(debugMessage,"Bulletin Repository is not available which is unusual. Cannot send a bulletin.");
}
private static class MockAlertHandler extends AlertHandler {
private ComponentLog testLogger;
public MockAlertHandler(ComponentLog testLogger) {
this.testLogger = testLogger;
}
@Override
protected ComponentLog getLogger() {
return testLogger;
}
}
private static class MockAlertBulletinRepository extends MockBulletinRepository {
List<Bulletin> bulletinList;
public MockAlertBulletinRepository() {
bulletinList = new ArrayList<>();
}
@Override
public void addBulletin(Bulletin bulletin) {
bulletinList.add(bulletin);
}
@Override
public List<Bulletin> findBulletinsForController() {
return bulletinList;
}
}
}

View File

@ -81,6 +81,12 @@
<version>1.11.0-SNAPSHOT</version> <version>1.11.0-SNAPSHOT</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-rules-engine-service-api</artifactId>
<version>1.11.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.sql;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.rules.Action;
import org.apache.nifi.rules.PropertyContextActionHandler;
import org.apache.nifi.rules.engine.RulesEngineService;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Tags({"reporting", "rules", "action", "action handler", "status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "sql"})
@CapabilityDescription("Triggers rules-driven actions based on metrics values ")
public class MetricsEventReportingTask extends AbstractReportingTask {
private List<PropertyDescriptor> properties;
private MetricsQueryService metricsQueryService;
private volatile RulesEngineService rulesEngineService;
private volatile PropertyContextActionHandler actionHandler;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected void init(final ReportingInitializationContext config) {
metricsQueryService = new MetricsSqlQueryService(getLogger());
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QueryMetricsUtil.QUERY);
properties.add(QueryMetricsUtil.RULES_ENGINE);
properties.add(QueryMetricsUtil.ACTION_HANDLER);
this.properties = Collections.unmodifiableList(properties);
}
@OnScheduled
public void setup(final ConfigurationContext context) throws IOException {
actionHandler = context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class);
rulesEngineService = context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class);
}
@Override
public void onTrigger(ReportingContext context) {
try {
final String query = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
fireRules(context, actionHandler, rulesEngineService, query);
} catch (Exception e) {
getLogger().error("Error opening loading rules: {}", new Object[]{e.getMessage()}, e);
}
}
private void fireRules(ReportingContext context, PropertyContextActionHandler actionHandler, RulesEngineService engine, String query) throws Exception {
QueryResult queryResult = metricsQueryService.query(context, query);
getLogger().debug("Executing query: {}", new Object[]{ query });
ResultSetRecordSet recordSet = metricsQueryService.getResultSetRecordSet(queryResult);
Record record;
try {
while ((record = recordSet.next()) != null) {
final Map<String, Object> facts = new HashMap<>();
for (String fieldName : record.getRawFieldNames()) {
facts.put(fieldName, record.getValue(fieldName));
}
List<Action> actions = engine.fireRules(facts);
if(actions == null || actions.isEmpty()){
getLogger().debug("No actions required for provided facts.");
} else {
actions.forEach(action -> {
actionHandler.execute(context, action,facts);
});
}
}
} finally {
metricsQueryService.closeQuietly(recordSet);
}
}
}

View File

@ -16,21 +16,16 @@
*/ */
package org.apache.nifi.reporting.sql; package org.apache.nifi.reporting.sql;
import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.record.sink.RecordSinkService; import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.serialization.record.ResultSetRecordSet; import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
@ -50,34 +45,6 @@ import java.util.concurrent.TimeUnit;
+ "query on the table when the capability is disabled will cause an error.") + "query on the table when the capability is disabled will cause an error.")
public class QueryNiFiReportingTask extends AbstractReportingTask { public class QueryNiFiReportingTask extends AbstractReportingTask {
static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder()
.name("sql-reporting-record-sink")
.displayName("Record Destination Service")
.description("Specifies the Controller Service to use for writing out the query result records to some destination.")
.identifiesControllerService(RecordSinkService.class)
.required(true)
.build();
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("sql-reporting-query")
.displayName("SQL Query")
.description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. "
+ "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables. Note that the "
+ "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SqlValidator())
.build();
static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder()
.name("sql-reporting-include-zero-record-results")
.displayName("Include Zero Record Results")
.description("When running the SQL statement, if the result has no data, this property specifies whether or not the empty result set will be transmitted.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
@ -89,9 +56,9 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
protected void init(final ReportingInitializationContext config) { protected void init(final ReportingInitializationContext config) {
metricsQueryService = new MetricsSqlQueryService(getLogger()); metricsQueryService = new MetricsSqlQueryService(getLogger());
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QUERY); properties.add(QueryMetricsUtil.QUERY);
properties.add(RECORD_SINK); properties.add(QueryMetricsUtil.RECORD_SINK);
properties.add(INCLUDE_ZERO_RECORD_RESULTS); properties.add(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
} }
@ -102,7 +69,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
@OnScheduled @OnScheduled
public void setup(final ConfigurationContext context) throws IOException { public void setup(final ConfigurationContext context) throws IOException {
recordSinkService = context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class); recordSinkService = context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class);
recordSinkService.reset(); recordSinkService.reset();
} }
@ -110,7 +77,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
public void onTrigger(ReportingContext context) { public void onTrigger(ReportingContext context) {
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
try { try {
final String sql = context.getProperty(QUERY).evaluateAttributeExpressions().getValue(); final String sql = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
final QueryResult queryResult = metricsQueryService.query(context, sql); final QueryResult queryResult = metricsQueryService.query(context, sql);
final ResultSetRecordSet recordSet; final ResultSetRecordSet recordSet;
@ -129,7 +96,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
attributes.put("reporting.task.name", getName()); attributes.put("reporting.task.name", getName());
attributes.put("reporting.task.uuid", getIdentifier()); attributes.put("reporting.task.uuid", getIdentifier());
attributes.put("reporting.task.type", this.getClass().getSimpleName()); attributes.put("reporting.task.type", this.getClass().getSimpleName());
recordSinkService.sendData(recordSet, attributes, context.getProperty(INCLUDE_ZERO_RECORD_RESULTS).asBoolean()); recordSinkService.sendData(recordSet, attributes, context.getProperty(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS).asBoolean());
} catch (Exception e) { } catch (Exception e) {
getLogger().error("Error during transmission of query results due to {}", new Object[]{e.getMessage()}, e); getLogger().error("Error during transmission of query results due to {}", new Object[]{e.getMessage()}, e);
return; return;
@ -143,41 +110,4 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
} }
} }
private static class SqlValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(true)
.explanation("Expression Language Present")
.build();
}
final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
final SqlParser.Config config = SqlParser.configBuilder()
.setLex(Lex.MYSQL_ANSI)
.build();
final SqlParser parser = SqlParser.create(substituted, config);
try {
parser.parseStmt();
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(true)
.build();
} catch (final Exception e) {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(false)
.explanation("Not a valid SQL Statement: " + e.getMessage())
.build();
}
}
}
} }

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.sql.util;
import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.rules.PropertyContextActionHandler;
import org.apache.nifi.rules.engine.RulesEngineService;
public class QueryMetricsUtil {
public static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder()
.name("sql-reporting-record-sink")
.displayName("Record Destination Service")
.description("Specifies the Controller Service to use for writing out the query result records to some destination.")
.identifiesControllerService(RecordSinkService.class)
.required(true)
.build();
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("sql-reporting-query")
.displayName("SQL Query")
.description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. "
+ "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables. Note that the "
+ "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SqlValidator())
.build();
public static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder()
.name("sql-reporting-include-zero-record-results")
.displayName("Include Zero Record Results")
.description("When running the SQL statement, if the result has no data, this property specifies whether or not the empty result set will be transmitted.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor RULES_ENGINE = new PropertyDescriptor.Builder()
.name("rules-engine-service")
.displayName("Rules Engine Service")
.description("Specifies the Controller Service to use for applying rules to metrics.")
.identifiesControllerService(RulesEngineService.class)
.required(true)
.build();
public static final PropertyDescriptor ACTION_HANDLER = new PropertyDescriptor.Builder()
.name("action-handler")
.displayName("Event Action Handler")
.description("Handler that will execute the defined action returned from rules engine (if Action type is supported by the handler)")
.identifiesControllerService(PropertyContextActionHandler.class)
.required(true)
.build();
public static class SqlValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(true)
.explanation("Expression Language Present")
.build();
}
final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
final SqlParser.Config config = SqlParser.configBuilder()
.setLex(Lex.MYSQL_ANSI)
.build();
final SqlParser parser = SqlParser.create(substituted, config);
try {
parser.parseStmt();
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(true)
.build();
} catch (final Exception e) {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(false)
.explanation("Not a valid SQL Statement: " + e.getMessage())
.build();
}
}
}
}

View File

@ -14,3 +14,4 @@
# limitations under the License. # limitations under the License.
org.apache.nifi.reporting.sql.QueryNiFiReportingTask org.apache.nifi.reporting.sql.QueryNiFiReportingTask
org.apache.nifi.reporting.sql.MetricsEventReportingTask

View File

@ -0,0 +1,34 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>Metrics Event Reporting Task</title>
<!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Summary</h2>
<p>
This reporting task can be used to issue SQL queries against various NiFi metrics information, submit returned data to a rules engine (which will determine if any actions should be performed)
and execute the prescribed actions using action handlers. This task requires a RulesEngineService (which will identify any actions that should be performed) and an ActionHandler which will execute the action(s).
A distinct ActionHandler can be used to service all events or an ActionHandlerLookup can be used for dynamic handler lookup. NOTE: Optimally action handler should be associated with the expected action types
returned from the rules engine.
</p>
<br/>
</body>
</html>

View File

@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.sql;
import com.google.common.collect.Lists;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.rules.Action;
import org.apache.nifi.rules.MockPropertyContextActionHandler;
import org.apache.nifi.rules.PropertyContextActionHandler;
import org.apache.nifi.rules.engine.MockRulesEngineService;
import org.apache.nifi.rules.engine.RulesEngineService;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockPropertyValue;
import org.apache.nifi.util.Tuple;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
public class TestMetricsEventReportingTask {
private ReportingContext context;
private MockMetricsEventReportingTask reportingTask;
private MockPropertyContextActionHandler actionHandler;
private MockRulesEngineService rulesEngineService;
private ProcessGroupStatus status;
@Before
public void setup() {
status = new ProcessGroupStatus();
actionHandler = new MockPropertyContextActionHandler();
status.setId("1234");
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
status.setFlowFilesSent(10);
status.setBytesRead(20000L);
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
// create a processor status with processing time
ProcessorStatus procStatus = new ProcessorStatus();
procStatus.setId("proc");
procStatus.setProcessingNanos(123456789);
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
ConnectionStatusPredictions connectionStatusPredictions = new ConnectionStatusPredictions();
connectionStatusPredictions.setPredictedTimeToCountBackpressureMillis(1000);
connectionStatusPredictions.setPredictedTimeToBytesBackpressureMillis(1000);
connectionStatusPredictions.setNextPredictedQueuedCount(1000000000);
connectionStatusPredictions.setNextPredictedQueuedBytes(1000000000000000L);
ConnectionStatus root1ConnectionStatus = new ConnectionStatus();
root1ConnectionStatus.setId("root1");
root1ConnectionStatus.setQueuedCount(1000);
root1ConnectionStatus.setPredictions(connectionStatusPredictions);
ConnectionStatus root2ConnectionStatus = new ConnectionStatus();
root2ConnectionStatus.setId("root2");
root2ConnectionStatus.setQueuedCount(500);
root2ConnectionStatus.setPredictions(connectionStatusPredictions);
Collection<ConnectionStatus> rootConnectionStatuses = new ArrayList<>();
rootConnectionStatuses.add(root1ConnectionStatus);
rootConnectionStatuses.add(root2ConnectionStatus);
status.setConnectionStatus(rootConnectionStatuses);
// create a group status with processing time
ProcessGroupStatus groupStatus1 = new ProcessGroupStatus();
groupStatus1.setProcessorStatus(processorStatuses);
groupStatus1.setBytesRead(1234L);
// Create a nested group status with a connection
ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
groupStatus2.setProcessorStatus(processorStatuses);
groupStatus2.setBytesRead(12345L);
ConnectionStatus nestedConnectionStatus = new ConnectionStatus();
nestedConnectionStatus.setId("nested");
nestedConnectionStatus.setQueuedCount(1001);
Collection<ConnectionStatus> nestedConnectionStatuses = new ArrayList<>();
nestedConnectionStatuses.add(nestedConnectionStatus);
groupStatus2.setConnectionStatus(nestedConnectionStatuses);
Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>();
nestedGroupStatuses.add(groupStatus2);
groupStatus1.setProcessGroupStatus(nestedGroupStatuses);
ProcessGroupStatus groupStatus3 = new ProcessGroupStatus();
groupStatus3.setBytesRead(1L);
ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus();
nestedConnectionStatus2.setId("nested2");
nestedConnectionStatus2.setQueuedCount(3);
Collection<ConnectionStatus> nestedConnectionStatuses2 = new ArrayList<>();
nestedConnectionStatuses2.add(nestedConnectionStatus2);
groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
Collection<ProcessGroupStatus> nestedGroupStatuses2 = new ArrayList<>();
nestedGroupStatuses2.add(groupStatus3);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus1);
groupStatuses.add(groupStatus3);
status.setProcessGroupStatus(groupStatuses);
}
@Test
public void testConnectionStatusTable() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select connectionId, predictedQueuedCount, predictedTimeToBytesBackpressureMillis from CONNECTION_STATUS_PREDICTIONS");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String,Object>> metricsList = actionHandler.getRows();
List<Tuple<String, Action>> defaultLogActions = actionHandler.getDefaultActionsByType("LOG");
List<Tuple<String, Action>> defaultAlertActions = actionHandler.getDefaultActionsByType("ALERT");
List<PropertyContext> propertyContexts = actionHandler.getPropertyContexts();
assertFalse(metricsList.isEmpty());
assertEquals(2,defaultLogActions.size());
assertEquals(2,defaultAlertActions.size());
assertEquals(4,propertyContexts.size());
}
private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final BulletinRepository bulletinRepository = Mockito.mock(BulletinRepository.class);
reportingTask = new MockMetricsEventReportingTask();
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
reportingTask.initialize(initContext);
Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : reportingTask.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.putAll(customProperties);
context = Mockito.mock(ReportingContext.class);
Mockito.when(context.isAnalyticsEnabled()).thenReturn(true);
Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(reportingTask));
Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository);
Mockito.when(context.createBulletin(anyString(),any(Severity.class), anyString())).thenReturn(null);
Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
actionHandler = new MockPropertyContextActionHandler();
Mockito.when(pValue.asControllerService(PropertyContextActionHandler.class)).thenReturn(actionHandler);
Action action1 = new Action();
action1.setType("LOG");
Action action2 = new Action();
action2.setType("ALERT");
final PropertyValue resValue = Mockito.mock(StandardPropertyValue.class);
rulesEngineService = new MockRulesEngineService(Lists.newArrayList(action1,action2));
Mockito.when(resValue.asControllerService(RulesEngineService.class)).thenReturn(rulesEngineService);
ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class);
Mockito.when(configContext.getProperty(QueryMetricsUtil.RULES_ENGINE)).thenReturn(resValue);
Mockito.when(configContext.getProperty(QueryMetricsUtil.ACTION_HANDLER)).thenReturn(pValue);
reportingTask.setup(configContext);
return reportingTask;
}
private static final class MockMetricsEventReportingTask extends MetricsEventReportingTask {
}
}

View File

@ -31,6 +31,7 @@ import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.reporting.util.metrics.MetricNames; import org.apache.nifi.reporting.util.metrics.MetricNames;
import org.apache.nifi.state.MockStateManager; import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockPropertyValue; import org.apache.nifi.util.MockPropertyValue;
@ -138,8 +139,8 @@ public class TestQueryNiFiReportingTask {
@Test @Test
public void testConnectionStatusTable() throws IOException, InitializationException { public void testConnectionStatusTable() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>(); final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryNiFiReportingTask.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc"); properties.put(QueryMetricsUtil.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc");
reportingTask = initTask(properties); reportingTask = initTask(properties);
reportingTask.onTrigger(context); reportingTask.onTrigger(context);
@ -174,8 +175,8 @@ public class TestQueryNiFiReportingTask {
@Test @Test
public void testJvmMetricsTable() throws IOException, InitializationException { public void testJvmMetricsTable() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>(); final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryNiFiReportingTask.QUERY, "select " properties.put(QueryMetricsUtil.QUERY, "select "
+ Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT, + Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT,
MetricNames.JVM_THREAD_COUNT, MetricNames.JVM_THREAD_COUNT,
MetricNames.JVM_THREAD_STATES_BLOCKED, MetricNames.JVM_THREAD_STATES_BLOCKED,
@ -202,8 +203,8 @@ public class TestQueryNiFiReportingTask {
@Test @Test
public void testProcessGroupStatusTable() throws IOException, InitializationException { public void testProcessGroupStatusTable() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>(); final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryNiFiReportingTask.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc"); properties.put(QueryMetricsUtil.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc");
reportingTask = initTask(properties); reportingTask = initTask(properties);
reportingTask.onTrigger(context); reportingTask.onTrigger(context);
@ -227,8 +228,8 @@ public class TestQueryNiFiReportingTask {
@Test @Test
public void testNoResults() throws IOException, InitializationException { public void testNoResults() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>(); final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink"); properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryNiFiReportingTask.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000"); properties.put(QueryMetricsUtil.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000");
reportingTask = initTask(properties); reportingTask = initTask(properties);
reportingTask.onTrigger(context); reportingTask.onTrigger(context);
@ -263,11 +264,11 @@ public class TestQueryNiFiReportingTask {
final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
mockRecordSinkService = new MockRecordSinkService(); mockRecordSinkService = new MockRecordSinkService();
Mockito.when(context.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue); Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService); Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService);
ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class); ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class);
Mockito.when(configContext.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue); Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
reportingTask.setup(configContext); reportingTask.setup(configContext);
return reportingTask; return reportingTask;

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.rules;
import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.Tuple;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class MockPropertyContextActionHandler extends AbstractConfigurableComponent implements PropertyContextActionHandler{
private List<Map<String, Object>> rows = new ArrayList<>();
private List<Tuple<String,Action>> defaultActions = new ArrayList<>();
private List<PropertyContext> propertyContexts = new ArrayList<>();
@Override
public void execute(PropertyContext context, Action action, Map<String, Object> facts) {
propertyContexts.add(context);
execute(action, facts);
}
@Override
public void execute(Action action, Map<String, Object> facts) {
rows.add(facts);
defaultActions.add( new Tuple<>(action.getType(),action));
}
@Override
public void initialize(ControllerServiceInitializationContext context) throws InitializationException {
}
public List<Map<String, Object>> getRows() {
return rows;
}
public List<Tuple<String, Action>> getDefaultActions() {
return defaultActions;
}
public List<Tuple<String,Action>> getDefaultActionsByType(final String type){
return defaultActions.stream().filter(stringActionTuple -> stringActionTuple
.getKey().equalsIgnoreCase(type)).collect(Collectors.toList());
}
public List<PropertyContext> getPropertyContexts() {
return propertyContexts;
}
@Override
public String getIdentifier() {
return "MockPropertyContextActionHandler";
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.rules.engine;
import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.rules.Action;
import java.util.List;
import java.util.Map;
public class MockRulesEngineService extends AbstractConfigurableComponent implements RulesEngineService {
private List<Action> actions;
public MockRulesEngineService(List<Action> actions) {
this.actions = actions;
}
@Override
public List<Action> fireRules(Map<String, Object> facts) {
return actions;
}
@Override
public void initialize(ControllerServiceInitializationContext context) throws InitializationException {
}
@Override
public String getIdentifier() {
return "MockRulesEngineService";
}
}