mirror of https://github.com/apache/nifi.git
NIFI-6855 - added action type enforcement option for handlers
NIFI-6855 - added setting to support ignore, warn or throwing exception for unsupported action types. added EL support for defining types to enforce. NIFI-6855 - fix checkstyle violation Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #3886
This commit is contained in:
parent
d617c0b96a
commit
18245a4441
|
@ -16,22 +16,110 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.rules.handlers;
|
package org.apache.nifi.rules.handlers;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.context.PropertyContext;
|
import org.apache.nifi.context.PropertyContext;
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
import org.apache.nifi.rules.Action;
|
import org.apache.nifi.rules.Action;
|
||||||
import org.apache.nifi.rules.PropertyContextActionHandler;
|
import org.apache.nifi.rules.PropertyContextActionHandler;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public abstract class AbstractActionHandlerService extends AbstractControllerService implements PropertyContextActionHandler {
|
public abstract class AbstractActionHandlerService extends AbstractControllerService implements PropertyContextActionHandler {
|
||||||
|
|
||||||
public static enum DebugLevels {
|
protected List<String> enforceActionTypes;
|
||||||
|
protected EnforceActionTypeLevel enforceActionTypeLevel;
|
||||||
|
|
||||||
|
public enum DebugLevels {
|
||||||
trace, debug, info, warn, error
|
trace, debug, info, warn, error
|
||||||
}
|
}
|
||||||
public abstract void execute(Action action, Map<String, Object> facts);
|
|
||||||
|
|
||||||
public void execute(PropertyContext context, Action action, Map<String, Object> facts) {
|
public enum EnforceActionTypeLevel {
|
||||||
execute(action, facts);
|
IGNORE, WARN, EXCEPTION
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static final PropertyDescriptor ENFORCE_ACTION_TYPE = new PropertyDescriptor.Builder()
|
||||||
|
.name("action-handler-enforce-type")
|
||||||
|
.displayName("Enforce Action Type")
|
||||||
|
.required(false)
|
||||||
|
.description("The Action Type(s) that should be supported by this handler. If provided any other type an " +
|
||||||
|
"exception will be thrown. This can support a comma delimited list of types (e.g. ALERT,LOG)")
|
||||||
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor ENFORCE_ACTION_TYPE_LEVEL = new PropertyDescriptor.Builder()
|
||||||
|
.name("action-handler-enforce-type-level")
|
||||||
|
.displayName("Enforce Level")
|
||||||
|
.required(false)
|
||||||
|
.description("If specific action types are enforced, this setting specifies whether the action should be ignored," +
|
||||||
|
" a warning should be logged or if an exception is thrown. Default is to ignore the received action.")
|
||||||
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
|
.allowableValues(EnforceActionTypeLevel.values())
|
||||||
|
.defaultValue("IGNORE")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public void execute(Action action, Map<String, Object> facts) {
|
||||||
|
if (actionTypeNotSupported(action)) {
|
||||||
|
handleActionEnforcement(action);
|
||||||
|
} else {
|
||||||
|
executeAction(action, facts);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute(PropertyContext context, Action action, Map<String, Object> facts) {
|
||||||
|
if (actionTypeNotSupported(action)) {
|
||||||
|
handleActionEnforcement(action);
|
||||||
|
} else {
|
||||||
|
executeAction(context, action, facts);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void executeAction(Action action, Map<String, Object> facts) {
|
||||||
|
throw new UnsupportedOperationException("This method is not supported by this handler.");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void executeAction(PropertyContext propertyContext, Action action, Map<String, Object> facts) {
|
||||||
|
throw new UnsupportedOperationException("This method is not supported by this handler");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean actionTypeNotSupported(Action action) {
|
||||||
|
return enforceActionTypes != null && !enforceActionTypes.contains(action.getType());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void handleActionEnforcement(Action action) {
|
||||||
|
String message = "This Action Handler does not support actions with the provided type: " + action.getType();
|
||||||
|
if (enforceActionTypeLevel.equals(EnforceActionTypeLevel.WARN)) {
|
||||||
|
getLogger().warn(message);
|
||||||
|
} else if (enforceActionTypeLevel.equals(EnforceActionTypeLevel.EXCEPTION)) {
|
||||||
|
throw new UnsupportedOperationException(message);
|
||||||
|
} else if (getLogger().isDebugEnabled()) {
|
||||||
|
getLogger().debug(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnEnabled
|
||||||
|
public void onEnabled(final ConfigurationContext context) throws InitializationException {
|
||||||
|
String actionTypes = context.getProperty(ENFORCE_ACTION_TYPE).evaluateAttributeExpressions().getValue();
|
||||||
|
if(StringUtils.isNotEmpty(actionTypes)){
|
||||||
|
enforceActionTypes = Arrays.stream(actionTypes.split(","))
|
||||||
|
.map(String::trim)
|
||||||
|
.filter(StringUtils::isNotEmpty)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
String level = context.getProperty(ENFORCE_ACTION_TYPE_LEVEL).getValue();
|
||||||
|
if(StringUtils.isNotEmpty(level)) {
|
||||||
|
enforceActionTypeLevel = EnforceActionTypeLevel.valueOf(level);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,11 +95,15 @@ public class AlertHandler extends AbstractActionHandlerService {
|
||||||
properties.add(DEFAULT_CATEGORY);
|
properties.add(DEFAULT_CATEGORY);
|
||||||
properties.add(DEFAULT_MESSAGE);
|
properties.add(DEFAULT_MESSAGE);
|
||||||
properties.add(INCLUDE_FACTS);
|
properties.add(INCLUDE_FACTS);
|
||||||
|
properties.add(ENFORCE_ACTION_TYPE);
|
||||||
|
properties.add(ENFORCE_ACTION_TYPE_LEVEL);
|
||||||
this.properties = Collections.unmodifiableList(properties);
|
this.properties = Collections.unmodifiableList(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
@OnEnabled
|
@OnEnabled
|
||||||
public void onEnabled(final ConfigurationContext context) throws InitializationException {
|
public void onEnabled(final ConfigurationContext context) throws InitializationException {
|
||||||
|
super.onEnabled(context);
|
||||||
defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase();
|
defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase();
|
||||||
defaultCategory = context.getProperty(DEFAULT_CATEGORY).getValue();
|
defaultCategory = context.getProperty(DEFAULT_CATEGORY).getValue();
|
||||||
defaultMessage = context.getProperty(DEFAULT_MESSAGE).getValue();
|
defaultMessage = context.getProperty(DEFAULT_MESSAGE).getValue();
|
||||||
|
@ -112,14 +116,8 @@ public class AlertHandler extends AbstractActionHandlerService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(Action action, Map<String, Object> facts) {
|
protected void executeAction(PropertyContext propertyContext, Action action, Map<String, Object> facts) {
|
||||||
throw new UnsupportedOperationException("This method is not supported. The AlertHandler requires a Reporting Context");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void execute(PropertyContext propertyContext, Action action, Map<String, Object> facts) {
|
|
||||||
ComponentLog logger = getLogger();
|
ComponentLog logger = getLogger();
|
||||||
|
|
||||||
if (propertyContext instanceof ReportingContext) {
|
if (propertyContext instanceof ReportingContext) {
|
||||||
|
|
||||||
ReportingContext context = (ReportingContext) propertyContext;
|
ReportingContext context = (ReportingContext) propertyContext;
|
||||||
|
@ -144,7 +142,6 @@ public class AlertHandler extends AbstractActionHandlerService {
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Reporting context was not provided to create bulletins.");
|
logger.warn("Reporting context was not provided to create bulletins.");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getMessage(String alertMessage, Map<String, Object> facts){
|
protected String getMessage(String alertMessage, Map<String, Object> facts){
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.context.PropertyContext;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
@ -64,11 +65,15 @@ public class ExpressionHandler extends AbstractActionHandlerService {
|
||||||
super.init(config);
|
super.init(config);
|
||||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
properties.add(DEFAULT_EXPRESSION_LANGUAGE_TYPE);
|
properties.add(DEFAULT_EXPRESSION_LANGUAGE_TYPE);
|
||||||
|
properties.add(ENFORCE_ACTION_TYPE);
|
||||||
|
properties.add(ENFORCE_ACTION_TYPE_LEVEL);
|
||||||
this.properties = Collections.unmodifiableList(properties);
|
this.properties = Collections.unmodifiableList(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
@OnEnabled
|
@OnEnabled
|
||||||
public void onEnabled(final ConfigurationContext context) throws InitializationException {
|
public void onEnabled(final ConfigurationContext context) throws InitializationException {
|
||||||
|
super.onEnabled(context);
|
||||||
type = ExpresssionType.valueOf(context.getProperty(DEFAULT_EXPRESSION_LANGUAGE_TYPE).getValue());
|
type = ExpresssionType.valueOf(context.getProperty(DEFAULT_EXPRESSION_LANGUAGE_TYPE).getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,9 +82,13 @@ public class ExpressionHandler extends AbstractActionHandlerService {
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void executeAction(PropertyContext propertyContext, Action action, Map<String, Object> facts) {
|
||||||
|
executeAction(action, facts);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(Action action, Map<String, Object> facts) {
|
protected void executeAction(Action action, Map<String, Object> facts) {
|
||||||
Map<String, String> attributes = action.getAttributes();
|
Map<String, String> attributes = action.getAttributes();
|
||||||
final String command = attributes.get("command");
|
final String command = attributes.get("command");
|
||||||
if(StringUtils.isNotEmpty(command)) {
|
if(StringUtils.isNotEmpty(command)) {
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.context.PropertyContext;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
|
@ -93,11 +94,15 @@ public class LogHandler extends AbstractActionHandlerService {
|
||||||
properties.add(LOG_FACTS);
|
properties.add(LOG_FACTS);
|
||||||
properties.add(DEFAULT_LOG_LEVEL);
|
properties.add(DEFAULT_LOG_LEVEL);
|
||||||
properties.add(DEFAULT_LOG_MESSAGE);
|
properties.add(DEFAULT_LOG_MESSAGE);
|
||||||
|
properties.add(ENFORCE_ACTION_TYPE);
|
||||||
|
properties.add(ENFORCE_ACTION_TYPE_LEVEL);
|
||||||
this.properties = Collections.unmodifiableList(properties);
|
this.properties = Collections.unmodifiableList(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
@OnEnabled
|
@OnEnabled
|
||||||
public void onEnabled(final ConfigurationContext context) throws InitializationException {
|
public void onEnabled(final ConfigurationContext context) throws InitializationException {
|
||||||
|
super.onEnabled(context);
|
||||||
logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions().getValue();
|
logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions().getValue();
|
||||||
logFacts = context.getProperty(LOG_FACTS).asBoolean();
|
logFacts = context.getProperty(LOG_FACTS).asBoolean();
|
||||||
defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase();
|
defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase();
|
||||||
|
@ -110,7 +115,12 @@ public class LogHandler extends AbstractActionHandlerService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(Action action, Map<String, Object> facts) {
|
protected void executeAction(PropertyContext propertyContext, Action action, Map<String, Object> facts) {
|
||||||
|
executeAction(action, facts);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void executeAction(Action action, Map<String, Object> facts) {
|
||||||
ComponentLog logger = getLogger();
|
ComponentLog logger = getLogger();
|
||||||
Map<String, String> attributes = action.getAttributes();
|
Map<String, String> attributes = action.getAttributes();
|
||||||
final String logLevel = attributes.get("logLevel");
|
final String logLevel = attributes.get("logLevel");
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.context.PropertyContext;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||||
import org.apache.nifi.record.sink.RecordSinkService;
|
import org.apache.nifi.record.sink.RecordSinkService;
|
||||||
|
@ -64,6 +65,8 @@ public class RecordSinkHandler extends AbstractActionHandlerService{
|
||||||
super.init(config);
|
super.init(config);
|
||||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
properties.add(RECORD_SINK_SERVICE);
|
properties.add(RECORD_SINK_SERVICE);
|
||||||
|
properties.add(ENFORCE_ACTION_TYPE);
|
||||||
|
properties.add(ENFORCE_ACTION_TYPE_LEVEL);
|
||||||
this.properties = Collections.unmodifiableList(properties);
|
this.properties = Collections.unmodifiableList(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,25 +75,32 @@ public class RecordSinkHandler extends AbstractActionHandlerService{
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
@OnEnabled
|
@OnEnabled
|
||||||
public void onEnabled(final ConfigurationContext context) throws InitializationException {
|
public void onEnabled(final ConfigurationContext context) throws InitializationException {
|
||||||
|
super.onEnabled(context);
|
||||||
if(context.getProperty(RECORD_SINK_SERVICE).isSet()) {
|
if(context.getProperty(RECORD_SINK_SERVICE).isSet()) {
|
||||||
recordSinkService = context.getProperty(RECORD_SINK_SERVICE).asControllerService(RecordSinkService.class);
|
recordSinkService = context.getProperty(RECORD_SINK_SERVICE).asControllerService(RecordSinkService.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(Action action, Map<String, Object> facts) {
|
protected void executeAction(PropertyContext propertyContext, Action action, Map<String, Object> facts) {
|
||||||
|
executeAction(action, facts);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void executeAction(Action action, Map<String, Object> facts) {
|
||||||
Map<String, String> attributes = action.getAttributes();
|
Map<String, String> attributes = action.getAttributes();
|
||||||
boolean sendZeroResults = attributes.containsKey("sentZeroResults") && Boolean.parseBoolean(attributes.get("sendZeroResults"));
|
boolean sendZeroResults = attributes.containsKey("sentZeroResults") && Boolean.parseBoolean(attributes.get("sendZeroResults"));
|
||||||
final RecordSet recordSet = getRecordSet(facts);
|
final RecordSet recordSet = getRecordSet(facts);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
WriteResult result = recordSinkService.sendData(recordSet, attributes, sendZeroResults);
|
WriteResult result = recordSinkService.sendData(recordSet, attributes, sendZeroResults);
|
||||||
if(getLogger().isDebugEnabled() && result != null){
|
if (getLogger().isDebugEnabled() && result != null) {
|
||||||
getLogger().debug("Records written to sink service: {}", new Object[]{result.getRecordCount()});
|
getLogger().debug("Records written to sink service: {}", new Object[]{result.getRecordCount()});
|
||||||
}
|
}
|
||||||
}catch (Exception ex){
|
} catch (Exception ex) {
|
||||||
getLogger().warn("Exception encountered when attempting to send metrics", ex);
|
getLogger().warn("Exception encountered when attempting to send metrics", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,9 +220,132 @@ public class TestAlertHandler {
|
||||||
ReportingContext fakeContext = Mockito.mock(ReportingContext.class);
|
ReportingContext fakeContext = Mockito.mock(ReportingContext.class);
|
||||||
Mockito.when(reportingContext.getBulletinRepository()).thenReturn(null);
|
Mockito.when(reportingContext.getBulletinRepository()).thenReturn(null);
|
||||||
alertHandler.execute(fakeContext, action, metrics);
|
alertHandler.execute(fakeContext, action, metrics);
|
||||||
final String debugMessage = mockComponentLog.getWarnMessage();
|
final String warnMessage = mockComponentLog.getWarnMessage();
|
||||||
|
assertTrue(StringUtils.isNotEmpty(warnMessage));
|
||||||
|
assertEquals(warnMessage,"Bulletin Repository is not available which is unusual. Cannot send a bulletin.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidActionTypeException(){
|
||||||
|
|
||||||
|
runner.disableControllerService(alertHandler);
|
||||||
|
runner.setProperty(alertHandler, AlertHandler.ENFORCE_ACTION_TYPE, "ALERT");
|
||||||
|
runner.setProperty(alertHandler, AlertHandler.ENFORCE_ACTION_TYPE_LEVEL, "EXCEPTION");
|
||||||
|
runner.enableControllerService(alertHandler);
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
final Map<String, Object> metrics = new HashMap<>();
|
||||||
|
|
||||||
|
final String category = "Rules Alert";
|
||||||
|
final String message = "This should be sent as an alert!";
|
||||||
|
final String severity = "INFO";
|
||||||
|
attributes.put("category", category);
|
||||||
|
attributes.put("message", message);
|
||||||
|
attributes.put("severity", severity);
|
||||||
|
metrics.put("jvmHeap", "1000000");
|
||||||
|
metrics.put("cpu", "90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("FAKE");
|
||||||
|
action.setAttributes(attributes);
|
||||||
|
try {
|
||||||
|
alertHandler.execute(reportingContext, action, metrics);
|
||||||
|
fail();
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidActionTypeWarn(){
|
||||||
|
|
||||||
|
runner.disableControllerService(alertHandler);
|
||||||
|
runner.setProperty(alertHandler, AlertHandler.ENFORCE_ACTION_TYPE, "ALERT");
|
||||||
|
runner.setProperty(alertHandler, AlertHandler.ENFORCE_ACTION_TYPE_LEVEL, "WARN");
|
||||||
|
runner.enableControllerService(alertHandler);
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
final Map<String, Object> metrics = new HashMap<>();
|
||||||
|
|
||||||
|
final String category = "Rules Alert";
|
||||||
|
final String message = "This should be sent as an alert!";
|
||||||
|
final String severity = "INFO";
|
||||||
|
attributes.put("category", category);
|
||||||
|
attributes.put("message", message);
|
||||||
|
attributes.put("severity", severity);
|
||||||
|
metrics.put("jvmHeap", "1000000");
|
||||||
|
metrics.put("cpu", "90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("FAKE");
|
||||||
|
action.setAttributes(attributes);
|
||||||
|
try {
|
||||||
|
alertHandler.execute(reportingContext,action, metrics);
|
||||||
|
assertTrue(true);
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
final String warnMessage = mockComponentLog.getWarnMessage();
|
||||||
|
assertTrue(StringUtils.isNotEmpty(warnMessage));
|
||||||
|
assertEquals("This Action Handler does not support actions with the provided type: FAKE",warnMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidActionTypeIgnore(){
|
||||||
|
|
||||||
|
runner.disableControllerService(alertHandler);
|
||||||
|
runner.setProperty(alertHandler, AlertHandler.ENFORCE_ACTION_TYPE, "ALERT");
|
||||||
|
runner.setProperty(alertHandler, AlertHandler.ENFORCE_ACTION_TYPE_LEVEL, "IGNORE");
|
||||||
|
runner.enableControllerService(alertHandler);
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
final Map<String, Object> metrics = new HashMap<>();
|
||||||
|
|
||||||
|
final String category = "Rules Alert";
|
||||||
|
final String message = "This should be sent as an alert!";
|
||||||
|
final String severity = "INFO";
|
||||||
|
attributes.put("category", category);
|
||||||
|
attributes.put("message", message);
|
||||||
|
attributes.put("severity", severity);
|
||||||
|
metrics.put("jvmHeap", "1000000");
|
||||||
|
metrics.put("cpu", "90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("FAKE");
|
||||||
|
action.setAttributes(attributes);
|
||||||
|
try {
|
||||||
|
alertHandler.execute(reportingContext,action, metrics);
|
||||||
|
assertTrue(true);
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
final String debugMessage = mockComponentLog.getDebugMessage();
|
||||||
assertTrue(StringUtils.isNotEmpty(debugMessage));
|
assertTrue(StringUtils.isNotEmpty(debugMessage));
|
||||||
assertEquals(debugMessage,"Bulletin Repository is not available which is unusual. Cannot send a bulletin.");
|
assertEquals("This Action Handler does not support actions with the provided type: FAKE",debugMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidActionType(){
|
||||||
|
runner.disableControllerService(alertHandler);
|
||||||
|
runner.setProperty(alertHandler, AlertHandler.ENFORCE_ACTION_TYPE, "ALERT, LOG, ");
|
||||||
|
runner.enableControllerService(alertHandler);
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
final Map<String, Object> metrics = new HashMap<>();
|
||||||
|
|
||||||
|
final String category = "Rules Alert";
|
||||||
|
final String message = "This should be sent as an alert!";
|
||||||
|
final String severity = "INFO";
|
||||||
|
attributes.put("category", category);
|
||||||
|
attributes.put("message", message);
|
||||||
|
attributes.put("severity", severity);
|
||||||
|
metrics.put("jvmHeap", "1000000");
|
||||||
|
metrics.put("cpu", "90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("ALERT");
|
||||||
|
action.setAttributes(attributes);
|
||||||
|
try {
|
||||||
|
alertHandler.execute(reportingContext,action, metrics);
|
||||||
|
assertTrue(true);
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class MockAlertHandler extends AlertHandler {
|
private static class MockAlertHandler extends AlertHandler {
|
||||||
|
|
|
@ -28,9 +28,11 @@ import org.junit.Test;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.assertEquals;
|
||||||
import static junit.framework.TestCase.assertTrue;
|
import static junit.framework.TestCase.assertTrue;
|
||||||
import static org.hamcrest.core.IsInstanceOf.instanceOf;
|
import static org.hamcrest.core.IsInstanceOf.instanceOf;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public class TestExpressionHandler {
|
public class TestExpressionHandler {
|
||||||
|
|
||||||
|
@ -135,6 +137,102 @@ public class TestExpressionHandler {
|
||||||
assertTrue(logMessage.startsWith(expectedMessage));
|
assertTrue(logMessage.startsWith(expectedMessage));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidActionTypeException() {
|
||||||
|
runner.disableControllerService(expressionHandler);
|
||||||
|
runner.setProperty(expressionHandler, AlertHandler.ENFORCE_ACTION_TYPE, "EXPRESSION");
|
||||||
|
runner.setProperty(expressionHandler, AlertHandler.ENFORCE_ACTION_TYPE_LEVEL, "EXCEPTION");
|
||||||
|
runner.enableControllerService(expressionHandler);
|
||||||
|
final Map<String,String> attributes = new HashMap<>();
|
||||||
|
final Map<String,Object> metrics = new HashMap<>();
|
||||||
|
attributes.put("type","FAKE");
|
||||||
|
metrics.put("jvmHeap","1000000");
|
||||||
|
metrics.put("cpu","90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("FAKE");
|
||||||
|
action.setAttributes(attributes); try {
|
||||||
|
expressionHandler.execute(action, metrics);
|
||||||
|
fail();
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
assertTrue(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidActionTypeWarning() {
|
||||||
|
runner.disableControllerService(expressionHandler);
|
||||||
|
runner.setProperty(expressionHandler, AlertHandler.ENFORCE_ACTION_TYPE, "EXPRESSION");
|
||||||
|
runner.setProperty(expressionHandler, AlertHandler.ENFORCE_ACTION_TYPE_LEVEL, "WARN");
|
||||||
|
runner.enableControllerService(expressionHandler);
|
||||||
|
final Map<String,String> attributes = new HashMap<>();
|
||||||
|
final Map<String,Object> metrics = new HashMap<>();
|
||||||
|
attributes.put("type","FAKE");
|
||||||
|
metrics.put("jvmHeap","1000000");
|
||||||
|
metrics.put("cpu","90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("FAKE");
|
||||||
|
action.setAttributes(attributes); try {
|
||||||
|
expressionHandler.execute(action, metrics);
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
final String warnMessage = mockComponentLog.getWarnMessage();
|
||||||
|
assertTrue(StringUtils.isNotEmpty(warnMessage));
|
||||||
|
assertEquals("This Action Handler does not support actions with the provided type: FAKE",warnMessage);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidActionTypeIgnore() {
|
||||||
|
runner.disableControllerService(expressionHandler);
|
||||||
|
runner.setProperty(expressionHandler, AlertHandler.ENFORCE_ACTION_TYPE, "EXPRESSION");
|
||||||
|
runner.setProperty(expressionHandler, AlertHandler.ENFORCE_ACTION_TYPE_LEVEL, "IGNORE");
|
||||||
|
runner.enableControllerService(expressionHandler);
|
||||||
|
final Map<String,String> attributes = new HashMap<>();
|
||||||
|
final Map<String,Object> metrics = new HashMap<>();
|
||||||
|
attributes.put("type","FAKE");
|
||||||
|
metrics.put("jvmHeap","1000000");
|
||||||
|
metrics.put("cpu","90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("FAKE");
|
||||||
|
action.setAttributes(attributes); try {
|
||||||
|
expressionHandler.execute(action, metrics);
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
final String debugMessage = mockComponentLog.getDebugMessage();
|
||||||
|
assertTrue(StringUtils.isNotEmpty(debugMessage));
|
||||||
|
assertEquals("This Action Handler does not support actions with the provided type: FAKE",debugMessage);
|
||||||
|
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testValidActionType() {
|
||||||
|
runner.disableControllerService(expressionHandler);
|
||||||
|
runner.setProperty(expressionHandler, AlertHandler.ENFORCE_ACTION_TYPE, "EXPRESSION");
|
||||||
|
runner.enableControllerService(expressionHandler);
|
||||||
|
final Map<String,String> attributes = new HashMap<>();
|
||||||
|
final Map<String,Object> metrics = new HashMap<>();
|
||||||
|
attributes.put("type","FAKE");
|
||||||
|
metrics.put("jvmHeap","1000000");
|
||||||
|
metrics.put("cpu","90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("EXPRESSION");
|
||||||
|
action.setAttributes(attributes);
|
||||||
|
try {
|
||||||
|
expressionHandler.execute(action, metrics);
|
||||||
|
assertTrue(true);
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class MockExpressionHandler extends ExpressionHandler{
|
private static class MockExpressionHandler extends ExpressionHandler{
|
||||||
private ComponentLog testLogger;
|
private ComponentLog testLogger;
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.rules.handlers;
|
package org.apache.nifi.rules.handlers;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
@ -32,6 +33,7 @@ import static junit.framework.TestCase.assertTrue;
|
||||||
import static org.hamcrest.core.IsInstanceOf.instanceOf;
|
import static org.hamcrest.core.IsInstanceOf.instanceOf;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public class TestLogHandler {
|
public class TestLogHandler {
|
||||||
|
|
||||||
|
@ -133,6 +135,139 @@ public class TestLogHandler {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidActionTypeException() {
|
||||||
|
runner.disableControllerService(logHandler);
|
||||||
|
runner.setProperty(logHandler, AlertHandler.ENFORCE_ACTION_TYPE, "LOG");
|
||||||
|
runner.setProperty(logHandler, AlertHandler.ENFORCE_ACTION_TYPE_LEVEL, "EXCEPTION");
|
||||||
|
runner.enableControllerService(logHandler);
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
final Map<String, Object> metrics = new HashMap<>();
|
||||||
|
|
||||||
|
attributes.put("logLevel", "FAKE");
|
||||||
|
|
||||||
|
final String expectedMessage = "--------------------------------------------------\n" +
|
||||||
|
"Log Message: Rules Action Triggered Log.\n" +
|
||||||
|
"Log Facts:\n" +
|
||||||
|
"Field: cpu, Value: 90\n" +
|
||||||
|
"Field: jvmHeap, Value: 1000000";
|
||||||
|
|
||||||
|
metrics.put("jvmHeap", "1000000");
|
||||||
|
metrics.put("cpu", "90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("FAKE");
|
||||||
|
action.setAttributes(attributes);
|
||||||
|
try {
|
||||||
|
logHandler.execute(action, metrics);
|
||||||
|
fail();
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
assertTrue(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidActionTypeWarning() {
|
||||||
|
runner.disableControllerService(logHandler);
|
||||||
|
runner.setProperty(logHandler, AlertHandler.ENFORCE_ACTION_TYPE, "LOG");
|
||||||
|
runner.setProperty(logHandler, AlertHandler.ENFORCE_ACTION_TYPE_LEVEL, "WARN");
|
||||||
|
runner.enableControllerService(logHandler);
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
final Map<String, Object> metrics = new HashMap<>();
|
||||||
|
|
||||||
|
attributes.put("logLevel", "FAKE");
|
||||||
|
|
||||||
|
final String expectedMessage = "--------------------------------------------------\n" +
|
||||||
|
"Log Message: Rules Action Triggered Log.\n" +
|
||||||
|
"Log Facts:\n" +
|
||||||
|
"Field: cpu, Value: 90\n" +
|
||||||
|
"Field: jvmHeap, Value: 1000000";
|
||||||
|
|
||||||
|
metrics.put("jvmHeap", "1000000");
|
||||||
|
metrics.put("cpu", "90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("FAKE");
|
||||||
|
action.setAttributes(attributes);
|
||||||
|
try {
|
||||||
|
logHandler.execute(action, metrics);
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
final String warnMessage = mockComponentLog.getWarnMessage();
|
||||||
|
assertTrue(StringUtils.isNotEmpty(warnMessage));
|
||||||
|
TestCase.assertEquals("This Action Handler does not support actions with the provided type: FAKE",warnMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidActionTypeDebug() {
|
||||||
|
runner.disableControllerService(logHandler);
|
||||||
|
runner.setProperty(logHandler, AlertHandler.ENFORCE_ACTION_TYPE, "LOG");
|
||||||
|
runner.setProperty(logHandler, AlertHandler.ENFORCE_ACTION_TYPE_LEVEL, "IGNORE");
|
||||||
|
runner.enableControllerService(logHandler);
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
final Map<String, Object> metrics = new HashMap<>();
|
||||||
|
|
||||||
|
attributes.put("logLevel", "FAKE");
|
||||||
|
|
||||||
|
final String expectedMessage = "--------------------------------------------------\n" +
|
||||||
|
"Log Message: Rules Action Triggered Log.\n" +
|
||||||
|
"Log Facts:\n" +
|
||||||
|
"Field: cpu, Value: 90\n" +
|
||||||
|
"Field: jvmHeap, Value: 1000000";
|
||||||
|
|
||||||
|
metrics.put("jvmHeap", "1000000");
|
||||||
|
metrics.put("cpu", "90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("FAKE");
|
||||||
|
action.setAttributes(attributes);
|
||||||
|
try {
|
||||||
|
logHandler.execute(action, metrics);
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
final String debugMessage = mockComponentLog.getDebugMessage();
|
||||||
|
assertTrue(StringUtils.isNotEmpty(debugMessage));
|
||||||
|
TestCase.assertEquals("This Action Handler does not support actions with the provided type: FAKE",debugMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidActionType() {
|
||||||
|
runner.disableControllerService(logHandler);
|
||||||
|
runner.setProperty(logHandler, AlertHandler.ENFORCE_ACTION_TYPE, "LOG");
|
||||||
|
runner.enableControllerService(logHandler);
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
final Map<String, Object> metrics = new HashMap<>();
|
||||||
|
|
||||||
|
attributes.put("logLevel", "FAKE");
|
||||||
|
|
||||||
|
final String expectedMessage = "--------------------------------------------------\n" +
|
||||||
|
"Log Message: Rules Action Triggered Log.\n" +
|
||||||
|
"Log Facts:\n" +
|
||||||
|
"Field: cpu, Value: 90\n" +
|
||||||
|
"Field: jvmHeap, Value: 1000000";
|
||||||
|
|
||||||
|
metrics.put("jvmHeap", "1000000");
|
||||||
|
metrics.put("cpu", "90");
|
||||||
|
|
||||||
|
final Action action = new Action();
|
||||||
|
action.setType("LOG");
|
||||||
|
action.setAttributes(attributes);
|
||||||
|
try {
|
||||||
|
logHandler.execute(action, metrics);
|
||||||
|
assertTrue(true);
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class MockLogHandler extends LogHandler {
|
private static class MockLogHandler extends LogHandler {
|
||||||
private ComponentLog testLogger;
|
private ComponentLog testLogger;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue