mirror of https://github.com/apache/nifi.git
NIFI-9748: Added new property for Output Format to LogAttribute. Also made the FlowFile Properties (file size, entry date, lineage start date) optional and renamed from 'Standard FlowFile Attributes' to 'FlowFile Properties' because this has led to confusion many times in the past, around users wanting to reference these things as attributes via EL but they are not actually attributes.
This closes #5825 Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
parent
8959226b50
commit
65dd62716a
|
@ -16,20 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.nio.charset.Charset;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
@ -39,6 +25,7 @@ import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
@ -53,6 +40,22 @@ import org.apache.nifi.processor.io.InputStreamCallback;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.eclipse.jetty.util.StringUtil;
|
import org.eclipse.jetty.util.StringUtil;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@EventDriven
|
@EventDriven
|
||||||
@SideEffectFree
|
@SideEffectFree
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
|
@ -60,6 +63,10 @@ import org.eclipse.jetty.util.StringUtil;
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@CapabilityDescription("Emits attributes of the FlowFile at the specified log level")
|
@CapabilityDescription("Emits attributes of the FlowFile at the specified log level")
|
||||||
public class LogAttribute extends AbstractProcessor {
|
public class LogAttribute extends AbstractProcessor {
|
||||||
|
private static final AllowableValue OUTPUT_FORMAT_LINE_PER_ATTRIBUTE = new AllowableValue("Line per Attribute", "Line per Attribute", "Each FlowFile attribute will be logged using a single line" +
|
||||||
|
" for the attribute name and another line for the attribute value. This format is often most advantageous when looking at the attributes of a single FlowFile.");
|
||||||
|
private static final AllowableValue OUTPUT_FORMAT_SINGLE_LINE = new AllowableValue("Single Line", "Single Line", "All FlowFile attribute names and values will be logged on a single line. This " +
|
||||||
|
"format is often most advantageous when comparing logs from multiple FlowFiles.");
|
||||||
|
|
||||||
public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder()
|
||||||
.name("Log Level")
|
.name("Log Level")
|
||||||
|
@ -99,6 +106,14 @@ public class LogAttribute extends AbstractProcessor {
|
||||||
" There's an OR relationship between the two properties.")
|
" There's an OR relationship between the two properties.")
|
||||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
public static final PropertyDescriptor OUTPUT_FORMAT = new PropertyDescriptor.Builder()
|
||||||
|
.name("Output Format")
|
||||||
|
.displayName("Output Format")
|
||||||
|
.description("Specifies the format to use for logging FlowFile attributes")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues(OUTPUT_FORMAT_LINE_PER_ATTRIBUTE, OUTPUT_FORMAT_SINGLE_LINE)
|
||||||
|
.defaultValue(OUTPUT_FORMAT_LINE_PER_ATTRIBUTE.getValue())
|
||||||
|
.build();
|
||||||
public static final PropertyDescriptor LOG_PAYLOAD = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor LOG_PAYLOAD = new PropertyDescriptor.Builder()
|
||||||
.name("Log Payload")
|
.name("Log Payload")
|
||||||
.required(true)
|
.required(true)
|
||||||
|
@ -106,7 +121,14 @@ public class LogAttribute extends AbstractProcessor {
|
||||||
.defaultValue("false")
|
.defaultValue("false")
|
||||||
.allowableValues("true", "false")
|
.allowableValues("true", "false")
|
||||||
.build();
|
.build();
|
||||||
|
static final PropertyDescriptor LOG_FLOWFILE_PROPERTIES = new PropertyDescriptor.Builder()
|
||||||
|
.name("Log FlowFile Properties")
|
||||||
|
.displayName("Log FlowFile Properties")
|
||||||
|
.description("Specifies whether or not to log FlowFile \"properties\", such as Entry Date, Lineage Start Date, and content size")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("true")
|
||||||
|
.build();
|
||||||
public static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder()
|
||||||
.name("Log prefix")
|
.name("Log prefix")
|
||||||
.required(false)
|
.required(false)
|
||||||
|
@ -154,6 +176,8 @@ public class LogAttribute extends AbstractProcessor {
|
||||||
supDescriptors.add(ATTRIBUTES_TO_LOG_REGEX);
|
supDescriptors.add(ATTRIBUTES_TO_LOG_REGEX);
|
||||||
supDescriptors.add(ATTRIBUTES_TO_IGNORE_CSV);
|
supDescriptors.add(ATTRIBUTES_TO_IGNORE_CSV);
|
||||||
supDescriptors.add(ATTRIBUTES_TO_IGNORE_REGEX);
|
supDescriptors.add(ATTRIBUTES_TO_IGNORE_REGEX);
|
||||||
|
supDescriptors.add(LOG_FLOWFILE_PROPERTIES);
|
||||||
|
supDescriptors.add(OUTPUT_FORMAT);
|
||||||
supDescriptors.add(LOG_PREFIX);
|
supDescriptors.add(LOG_PREFIX);
|
||||||
supDescriptors.add(CHARSET);
|
supDescriptors.add(CHARSET);
|
||||||
supportedDescriptors = Collections.unmodifiableList(supDescriptors);
|
supportedDescriptors = Collections.unmodifiableList(supDescriptors);
|
||||||
|
@ -171,7 +195,6 @@ public class LogAttribute extends AbstractProcessor {
|
||||||
|
|
||||||
protected String processFlowFile(final ComponentLog logger, final DebugLevels logLevel, final FlowFile flowFile, final ProcessSession session, final ProcessContext context) {
|
protected String processFlowFile(final ComponentLog logger, final DebugLevels logLevel, final FlowFile flowFile, final ProcessSession session, final ProcessContext context) {
|
||||||
final Set<String> attributeKeys = getAttributesToLog(flowFile.getAttributes().keySet(), context);
|
final Set<String> attributeKeys = getAttributesToLog(flowFile.getAttributes().keySet(), context);
|
||||||
final ComponentLog LOG = getLogger();
|
|
||||||
final String dashedLine;
|
final String dashedLine;
|
||||||
|
|
||||||
String logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
|
String logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
@ -188,21 +211,44 @@ public class LogAttribute extends AbstractProcessor {
|
||||||
dashedLine = StringUtils.repeat('-', 5) + logPrefix + StringUtils.repeat('-', 5);
|
dashedLine = StringUtils.repeat('-', 5) + logPrefix + StringUtils.repeat('-', 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue();
|
||||||
|
final boolean logProperties = context.getProperty(LOG_FLOWFILE_PROPERTIES).asBoolean();
|
||||||
|
|
||||||
// Pretty print metadata
|
// Pretty print metadata
|
||||||
final StringBuilder message = new StringBuilder();
|
final StringBuilder message = new StringBuilder();
|
||||||
message.append("logging for flow file ").append(flowFile);
|
message.append("logging for flow file ").append(flowFile);
|
||||||
message.append("\n");
|
message.append("\n");
|
||||||
message.append(dashedLine);
|
|
||||||
message.append("\nStandard FlowFile Attributes");
|
if (OUTPUT_FORMAT_LINE_PER_ATTRIBUTE.getValue().equalsIgnoreCase(outputFormat)) {
|
||||||
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "entryDate", new Date(flowFile.getEntryDate())));
|
message.append(dashedLine);
|
||||||
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "lineageStartDate", new Date(flowFile.getLineageStartDate())));
|
|
||||||
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "fileSize", flowFile.getSize()));
|
if (logProperties) {
|
||||||
message.append("\nFlowFile Attribute Map Content");
|
message.append("\nFlowFile Properties");
|
||||||
for (final String key : attributeKeys) {
|
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "entryDate", new Date(flowFile.getEntryDate())));
|
||||||
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", key, flowFile.getAttribute(key)));
|
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "lineageStartDate", new Date(flowFile.getLineageStartDate())));
|
||||||
|
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "fileSize", flowFile.getSize()));
|
||||||
|
}
|
||||||
|
|
||||||
|
message.append("\nFlowFile Attribute Map Content");
|
||||||
|
for (final String key : attributeKeys) {
|
||||||
|
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", key, flowFile.getAttribute(key)));
|
||||||
|
}
|
||||||
|
message.append("\n");
|
||||||
|
message.append(dashedLine);
|
||||||
|
} else {
|
||||||
|
final Map<String, String> attributes = new TreeMap<>();
|
||||||
|
for (final String key : attributeKeys) {
|
||||||
|
attributes.put(key, flowFile.getAttribute(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (logProperties) {
|
||||||
|
attributes.put("entryDate", new Date(flowFile.getEntryDate()).toString());
|
||||||
|
attributes.put("lineageStartDate", new Date(flowFile.getLineageStartDate()).toString());
|
||||||
|
attributes.put("fileSize", String.valueOf(flowFile.getSize()));
|
||||||
|
}
|
||||||
|
|
||||||
|
message.append("FlowFile Properties: ").append(attributes);
|
||||||
}
|
}
|
||||||
message.append("\n");
|
|
||||||
message.append(dashedLine);
|
|
||||||
|
|
||||||
// The user can request to log the payload
|
// The user can request to log the payload
|
||||||
final boolean logPayload = context.getProperty(LOG_PAYLOAD).asBoolean();
|
final boolean logPayload = context.getProperty(LOG_PAYLOAD).asBoolean();
|
||||||
|
@ -220,22 +266,23 @@ public class LogAttribute extends AbstractProcessor {
|
||||||
// Uses optional property to specify logging level
|
// Uses optional property to specify logging level
|
||||||
switch (logLevel) {
|
switch (logLevel) {
|
||||||
case info:
|
case info:
|
||||||
LOG.info(outputMessage);
|
logger.info(outputMessage);
|
||||||
break;
|
break;
|
||||||
case debug:
|
case debug:
|
||||||
LOG.debug(outputMessage);
|
logger.debug(outputMessage);
|
||||||
break;
|
break;
|
||||||
case warn:
|
case warn:
|
||||||
LOG.warn(outputMessage);
|
logger.warn(outputMessage);
|
||||||
break;
|
break;
|
||||||
case trace:
|
case trace:
|
||||||
LOG.trace(outputMessage);
|
logger.trace(outputMessage);
|
||||||
break;
|
break;
|
||||||
case error:
|
case error:
|
||||||
LOG.error(outputMessage);
|
logger.error(outputMessage);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
LOG.debug(outputMessage);
|
logger.debug(outputMessage);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return outputMessage;
|
return outputMessage;
|
||||||
|
@ -337,4 +384,5 @@ public class LogAttribute extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue