mirror of https://github.com/apache/nifi.git
NIFI-3763 Add new processor to log user defined messages built with NiFi Expression Language
Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #1737
This commit is contained in:
parent
1811ba5681
commit
bf15502e19
|
@ -86,6 +86,7 @@
|
||||||
<logger name="org.apache.nifi" level="INFO"/>
|
<logger name="org.apache.nifi" level="INFO"/>
|
||||||
<logger name="org.apache.nifi.processors" level="WARN"/>
|
<logger name="org.apache.nifi.processors" level="WARN"/>
|
||||||
<logger name="org.apache.nifi.processors.standard.LogAttribute" level="INFO"/>
|
<logger name="org.apache.nifi.processors.standard.LogAttribute" level="INFO"/>
|
||||||
|
<logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/>
|
||||||
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
|
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,211 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||||
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.eclipse.jetty.util.StringUtil;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
@EventDriven
|
||||||
|
@SideEffectFree
|
||||||
|
@SupportsBatching
|
||||||
|
@Tags({"attributes", "logging"})
|
||||||
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
|
@CapabilityDescription("Emits a log message at the specified log level")
|
||||||
|
public class LogMessage extends AbstractProcessor {
|
||||||
|
|
||||||
|
public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder()
|
||||||
|
.name("log-level")
|
||||||
|
.displayName("Log Level")
|
||||||
|
.required(true)
|
||||||
|
.description("The Log Level to use when logging the message")
|
||||||
|
.allowableValues(MessageLogLevel.values())
|
||||||
|
.defaultValue(MessageLogLevel.info.toString())
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder()
|
||||||
|
.name("log-prefix")
|
||||||
|
.displayName("Log prefix")
|
||||||
|
.required(false)
|
||||||
|
.description("Log prefix appended to the log lines. " +
|
||||||
|
"It helps to distinguish the output of multiple LogMessage processors.")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor LOG_MESSAGE = new PropertyDescriptor.Builder()
|
||||||
|
.name("log-message")
|
||||||
|
.displayName("Log message")
|
||||||
|
.required(false)
|
||||||
|
.description("The log message to emit")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
|
.name("success")
|
||||||
|
.description("All FlowFiles are routed to this relationship")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private static final int CHUNK_SIZE = 50;
|
||||||
|
|
||||||
|
enum MessageLogLevel {
|
||||||
|
|
||||||
|
trace, debug, info, warn, error
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<Relationship> relationships;
|
||||||
|
private List<PropertyDescriptor> supportedDescriptors;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
|
final Set<Relationship> procRels = new HashSet<>();
|
||||||
|
procRels.add(REL_SUCCESS);
|
||||||
|
relationships = Collections.unmodifiableSet(procRels);
|
||||||
|
|
||||||
|
// descriptors
|
||||||
|
final List<PropertyDescriptor> supDescriptors = new ArrayList<>();
|
||||||
|
supDescriptors.add(LOG_LEVEL);
|
||||||
|
supDescriptors.add(LOG_PREFIX);
|
||||||
|
supDescriptors.add(LOG_MESSAGE);
|
||||||
|
supportedDescriptors = Collections.unmodifiableList(supDescriptors);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return supportedDescriptors;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
|
|
||||||
|
final String logLevelValue = context.getProperty(LOG_LEVEL).getValue().toLowerCase();
|
||||||
|
|
||||||
|
final MessageLogLevel logLevel;
|
||||||
|
try {
|
||||||
|
logLevel = MessageLogLevel.valueOf(logLevelValue);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ProcessException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
final ComponentLog logger = getLogger();
|
||||||
|
boolean isLogLevelEnabled = false;
|
||||||
|
switch (logLevel) {
|
||||||
|
case trace:
|
||||||
|
isLogLevelEnabled = logger.isTraceEnabled();
|
||||||
|
break;
|
||||||
|
case debug:
|
||||||
|
isLogLevelEnabled = logger.isDebugEnabled();
|
||||||
|
break;
|
||||||
|
case info:
|
||||||
|
isLogLevelEnabled = logger.isInfoEnabled();
|
||||||
|
break;
|
||||||
|
case warn:
|
||||||
|
isLogLevelEnabled = logger.isWarnEnabled();
|
||||||
|
break;
|
||||||
|
case error:
|
||||||
|
isLogLevelEnabled = logger.isErrorEnabled();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isLogLevelEnabled) {
|
||||||
|
transferChunk(session);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final FlowFile flowFile = session.get();
|
||||||
|
if (flowFile == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
processFlowFile(logger, logLevel, flowFile, context);
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processFlowFile(
|
||||||
|
final ComponentLog logger,
|
||||||
|
final MessageLogLevel logLevel,
|
||||||
|
final FlowFile flowFile,
|
||||||
|
final ProcessContext context) {
|
||||||
|
|
||||||
|
String logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
String logMessage = context.getProperty(LOG_MESSAGE).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
|
||||||
|
String messageToWrite;
|
||||||
|
if (StringUtil.isBlank(logPrefix)) {
|
||||||
|
messageToWrite = logMessage;
|
||||||
|
} else {
|
||||||
|
messageToWrite = String.format("%s%s", logPrefix, logMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Uses optional property to specify logging level
|
||||||
|
switch (logLevel) {
|
||||||
|
case info:
|
||||||
|
logger.info(messageToWrite);
|
||||||
|
break;
|
||||||
|
case debug:
|
||||||
|
logger.debug(messageToWrite);
|
||||||
|
break;
|
||||||
|
case warn:
|
||||||
|
logger.warn(messageToWrite);
|
||||||
|
break;
|
||||||
|
case trace:
|
||||||
|
logger.trace(messageToWrite);
|
||||||
|
break;
|
||||||
|
case error:
|
||||||
|
logger.error(messageToWrite);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
logger.debug(messageToWrite);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void transferChunk(final ProcessSession session) {
|
||||||
|
final List<FlowFile> flowFiles = session.get(CHUNK_SIZE);
|
||||||
|
if (!flowFiles.isEmpty()) {
|
||||||
|
session.transfer(flowFiles, REL_SUCCESS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -58,6 +58,7 @@ org.apache.nifi.processors.standard.ListenTCP
|
||||||
org.apache.nifi.processors.standard.ListenUDP
|
org.apache.nifi.processors.standard.ListenUDP
|
||||||
org.apache.nifi.processors.standard.ListSFTP
|
org.apache.nifi.processors.standard.ListSFTP
|
||||||
org.apache.nifi.processors.standard.LogAttribute
|
org.apache.nifi.processors.standard.LogAttribute
|
||||||
|
org.apache.nifi.processors.standard.LogMessage
|
||||||
org.apache.nifi.processors.standard.MergeContent
|
org.apache.nifi.processors.standard.MergeContent
|
||||||
org.apache.nifi.processors.standard.ModifyBytes
|
org.apache.nifi.processors.standard.ModifyBytes
|
||||||
org.apache.nifi.processors.standard.MonitorActivity
|
org.apache.nifi.processors.standard.MonitorActivity
|
||||||
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
import org.apache.nifi.util.MockComponentLog;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class TestLogMessage {
|
||||||
|
|
||||||
|
private TestableLogMessage testableLogMessage;
|
||||||
|
private TestRunner runner;
|
||||||
|
|
||||||
|
private static class TestableLogMessage extends LogMessage {
|
||||||
|
|
||||||
|
MockComponentLog getMockComponentLog() {
|
||||||
|
ComponentLog mockLog = getLogger();
|
||||||
|
|
||||||
|
if (!(mockLog instanceof MockComponentLog)) {
|
||||||
|
throw new IllegalStateException("Logger is expected to be MockComponentLog, but was: " +
|
||||||
|
mockLog.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
return (MockComponentLog) mockLog;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws InitializationException {
|
||||||
|
testableLogMessage = new TestableLogMessage();
|
||||||
|
runner = TestRunners.newTestRunner(testableLogMessage);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() throws InitializationException {
|
||||||
|
runner.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInfoMessageLogged() throws InitializationException, IOException {
|
||||||
|
|
||||||
|
runner.setProperty(LogMessage.LOG_MESSAGE, "This should help the operator to follow the flow: ${foobar}");
|
||||||
|
runner.setProperty(LogMessage.LOG_LEVEL, LogMessage.MessageLogLevel.info.toString());
|
||||||
|
|
||||||
|
HashMap<String, String> flowAttributes = new HashMap<>();
|
||||||
|
flowAttributes.put("foobar", "baz");
|
||||||
|
|
||||||
|
runner.enqueue("This is a message!", flowAttributes);
|
||||||
|
runner.setValidateExpressionUsage(false);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
List<MockFlowFile> successFlowFiles = runner.getFlowFilesForRelationship(LogMessage.REL_SUCCESS);
|
||||||
|
Assert.assertEquals(1, successFlowFiles.size());
|
||||||
|
|
||||||
|
MockComponentLog mockComponentLog = testableLogMessage.getMockComponentLog();
|
||||||
|
List<org.apache.nifi.util.LogMessage> infoMessages = mockComponentLog.getInfoMessages();
|
||||||
|
Assert.assertEquals(1, infoMessages.size());
|
||||||
|
Assert.assertTrue(infoMessages.get(0).getMsg()
|
||||||
|
.endsWith("This should help the operator to follow the flow: baz"));
|
||||||
|
|
||||||
|
|
||||||
|
Assert.assertTrue(mockComponentLog.getTraceMessages().isEmpty());
|
||||||
|
Assert.assertTrue(mockComponentLog.getDebugMessages().isEmpty());
|
||||||
|
Assert.assertTrue(mockComponentLog.getWarnMessages().isEmpty());
|
||||||
|
Assert.assertTrue(mockComponentLog.getErrorMessages().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInfoMessageWithPrefixLogged() throws InitializationException, IOException {
|
||||||
|
|
||||||
|
runner.setProperty(LogMessage.LOG_PREFIX, "FOOBAR>>>");
|
||||||
|
runner.setProperty(LogMessage.LOG_MESSAGE, "This should help the operator to follow the flow: ${foobar}");
|
||||||
|
runner.setProperty(LogMessage.LOG_LEVEL, LogMessage.MessageLogLevel.info.toString());
|
||||||
|
|
||||||
|
HashMap<String, String> flowAttributes = new HashMap<>();
|
||||||
|
flowAttributes.put("foobar", "baz");
|
||||||
|
|
||||||
|
runner.enqueue("This is a message!", flowAttributes);
|
||||||
|
runner.setValidateExpressionUsage(false);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
List<MockFlowFile> successFlowFiles = runner.getFlowFilesForRelationship(LogMessage.REL_SUCCESS);
|
||||||
|
Assert.assertEquals(1, successFlowFiles.size());
|
||||||
|
|
||||||
|
MockComponentLog mockComponentLog = testableLogMessage.getMockComponentLog();
|
||||||
|
List<org.apache.nifi.util.LogMessage> infoMessages = mockComponentLog.getInfoMessages();
|
||||||
|
Assert.assertEquals(1, infoMessages.size());
|
||||||
|
Assert.assertTrue(infoMessages.get(0).getMsg()
|
||||||
|
.endsWith("FOOBAR>>>This should help the operator to follow the flow: baz"));
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Assert.assertTrue(mockComponentLog.getTraceMessages().isEmpty());
|
||||||
|
Assert.assertTrue(mockComponentLog.getDebugMessages().isEmpty());
|
||||||
|
Assert.assertTrue(mockComponentLog.getWarnMessages().isEmpty());
|
||||||
|
Assert.assertTrue(mockComponentLog.getErrorMessages().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue