From bf15502e1994c68d706a2a2749495b57fd9a11e4 Mon Sep 17 00:00:00 2001 From: "Peter G. Horvath" Date: Mon, 15 May 2017 18:10:26 +0200 Subject: [PATCH] NIFI-3763 Add new processor to log user defined messages built with NiFi Expression Language Signed-off-by: Matt Burgess This closes #1737 --- .../src/main/resources/conf/logback.xml | 1 + .../nifi/processors/standard/LogMessage.java | 211 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../processors/standard/TestLogMessage.java | 130 +++++++++++ 4 files changed, 343 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogMessage.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLogMessage.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml index f2da2004f5..da1adf051d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml @@ -86,6 +86,7 @@ + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogMessage.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogMessage.java new file mode 100644 index 0000000000..2d6b318f94 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogMessage.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.eclipse.jetty.util.StringUtil; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"attributes", "logging"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Emits a log message at the specified log level") +public class LogMessage extends AbstractProcessor { + + public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder() + .name("log-level") + .displayName("Log Level") + .required(true) + .description("The Log Level to use when logging the message") + .allowableValues(MessageLogLevel.values()) + .defaultValue(MessageLogLevel.info.toString()) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder() + .name("log-prefix") + .displayName("Log prefix") + .required(false) + .description("Log prefix appended to the log lines. " + + "It helps to distinguish the output of multiple LogMessage processors.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor LOG_MESSAGE = new PropertyDescriptor.Builder() + .name("log-message") + .displayName("Log message") + .required(false) + .description("The log message to emit") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles are routed to this relationship") + .build(); + + private static final int CHUNK_SIZE = 50; + + enum MessageLogLevel { + + trace, debug, info, warn, error + } + + private Set relationships; + private List supportedDescriptors; + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set procRels = new HashSet<>(); + procRels.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(procRels); + + // descriptors + final List supDescriptors = new ArrayList<>(); + supDescriptors.add(LOG_LEVEL); + supDescriptors.add(LOG_PREFIX); + supDescriptors.add(LOG_MESSAGE); + supportedDescriptors = Collections.unmodifiableList(supDescriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return supportedDescriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + + final String logLevelValue = context.getProperty(LOG_LEVEL).getValue().toLowerCase(); + + final MessageLogLevel logLevel; + try { + logLevel = MessageLogLevel.valueOf(logLevelValue); + } catch (Exception e) { + throw new ProcessException(e); + } + + final ComponentLog logger = getLogger(); + boolean isLogLevelEnabled = false; + switch (logLevel) { + case trace: + isLogLevelEnabled = logger.isTraceEnabled(); + break; + case debug: + isLogLevelEnabled = logger.isDebugEnabled(); + break; + case info: + isLogLevelEnabled = logger.isInfoEnabled(); + break; + case warn: + isLogLevelEnabled = logger.isWarnEnabled(); + break; + case error: + isLogLevelEnabled = logger.isErrorEnabled(); + break; + } + + if (!isLogLevelEnabled) { + transferChunk(session); + return; + } + + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + processFlowFile(logger, logLevel, flowFile, context); + session.transfer(flowFile, REL_SUCCESS); + } + + private void processFlowFile( + final ComponentLog logger, + final MessageLogLevel logLevel, + final FlowFile flowFile, + final ProcessContext context) { + + String logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions(flowFile).getValue(); + String logMessage = context.getProperty(LOG_MESSAGE).evaluateAttributeExpressions(flowFile).getValue(); + + String messageToWrite; + if (StringUtil.isBlank(logPrefix)) { + messageToWrite = logMessage; + } else { + messageToWrite = String.format("%s%s", logPrefix, logMessage); + } + + // Uses optional property to specify logging level + switch (logLevel) { + case info: + logger.info(messageToWrite); + break; + case debug: + logger.debug(messageToWrite); + break; + case warn: + logger.warn(messageToWrite); + break; + case trace: + logger.trace(messageToWrite); + break; + case error: + logger.error(messageToWrite); + break; + default: + logger.debug(messageToWrite); + } + } + + private void transferChunk(final ProcessSession session) { + final List flowFiles = session.get(CHUNK_SIZE); + if (!flowFiles.isEmpty()) { + session.transfer(flowFiles, REL_SUCCESS); + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 221fe0a91d..f345524011 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -58,6 +58,7 @@ org.apache.nifi.processors.standard.ListenTCP org.apache.nifi.processors.standard.ListenUDP org.apache.nifi.processors.standard.ListSFTP org.apache.nifi.processors.standard.LogAttribute +org.apache.nifi.processors.standard.LogMessage org.apache.nifi.processors.standard.MergeContent org.apache.nifi.processors.standard.ModifyBytes org.apache.nifi.processors.standard.MonitorActivity diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLogMessage.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLogMessage.java new file mode 100644 index 0000000000..98a8952d5b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLogMessage.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; + +public class TestLogMessage { + + private TestableLogMessage testableLogMessage; + private TestRunner runner; + + private static class TestableLogMessage extends LogMessage { + + MockComponentLog getMockComponentLog() { + ComponentLog mockLog = getLogger(); + + if (!(mockLog instanceof MockComponentLog)) { + throw new IllegalStateException("Logger is expected to be MockComponentLog, but was: " + + mockLog.getClass()); + } + + return (MockComponentLog) mockLog; + } + + + } + + @Before + public void before() throws InitializationException { + testableLogMessage = new TestableLogMessage(); + runner = TestRunners.newTestRunner(testableLogMessage); + + } + + @After + public void after() throws InitializationException { + runner.shutdown(); + } + + @Test + public void testInfoMessageLogged() throws InitializationException, IOException { + + runner.setProperty(LogMessage.LOG_MESSAGE, "This should help the operator to follow the flow: ${foobar}"); + runner.setProperty(LogMessage.LOG_LEVEL, LogMessage.MessageLogLevel.info.toString()); + + HashMap flowAttributes = new HashMap<>(); + flowAttributes.put("foobar", "baz"); + + runner.enqueue("This is a message!", flowAttributes); + runner.setValidateExpressionUsage(false); + + runner.run(); + + List successFlowFiles = runner.getFlowFilesForRelationship(LogMessage.REL_SUCCESS); + Assert.assertEquals(1, successFlowFiles.size()); + + MockComponentLog mockComponentLog = testableLogMessage.getMockComponentLog(); + List infoMessages = mockComponentLog.getInfoMessages(); + Assert.assertEquals(1, infoMessages.size()); + Assert.assertTrue(infoMessages.get(0).getMsg() + .endsWith("This should help the operator to follow the flow: baz")); + + + Assert.assertTrue(mockComponentLog.getTraceMessages().isEmpty()); + Assert.assertTrue(mockComponentLog.getDebugMessages().isEmpty()); + Assert.assertTrue(mockComponentLog.getWarnMessages().isEmpty()); + Assert.assertTrue(mockComponentLog.getErrorMessages().isEmpty()); + } + + @Test + public void testInfoMessageWithPrefixLogged() throws InitializationException, IOException { + + runner.setProperty(LogMessage.LOG_PREFIX, "FOOBAR>>>"); + runner.setProperty(LogMessage.LOG_MESSAGE, "This should help the operator to follow the flow: ${foobar}"); + runner.setProperty(LogMessage.LOG_LEVEL, LogMessage.MessageLogLevel.info.toString()); + + HashMap flowAttributes = new HashMap<>(); + flowAttributes.put("foobar", "baz"); + + runner.enqueue("This is a message!", flowAttributes); + runner.setValidateExpressionUsage(false); + + runner.run(); + + List successFlowFiles = runner.getFlowFilesForRelationship(LogMessage.REL_SUCCESS); + Assert.assertEquals(1, successFlowFiles.size()); + + MockComponentLog mockComponentLog = testableLogMessage.getMockComponentLog(); + List infoMessages = mockComponentLog.getInfoMessages(); + Assert.assertEquals(1, infoMessages.size()); + Assert.assertTrue(infoMessages.get(0).getMsg() + .endsWith("FOOBAR>>>This should help the operator to follow the flow: baz")); + + + + Assert.assertTrue(mockComponentLog.getTraceMessages().isEmpty()); + Assert.assertTrue(mockComponentLog.getDebugMessages().isEmpty()); + Assert.assertTrue(mockComponentLog.getWarnMessages().isEmpty()); + Assert.assertTrue(mockComponentLog.getErrorMessages().isEmpty()); + } + +}