NIFI-985: Custom log prefix for LogAttribute processor

The LogAttribute processor evaluates the log prefix EL using the current flow file.

Log prefix helps to distinguish the log output of multiple LogAttribute processors and identify the right processor. Log prefix appears in the first and the last log line, followed by the original 50 dashes. If you configure log prefix 'STEP 1: ' the log output looks like this:

STEP 1 :  --------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
        Value: 'Tue Sep 22 15:13:02 CEST 2015'
Key: 'lineageStartDate'
        Value: 'Tue Sep 22 15:13:02 CEST 2015'
Key: 'fileSize'
        Value: '9'
FlowFile Attribute Map Content
Key: 'customAttribute'
        Value: 'custom value'
STEP 1 :  --------------------------------------------------

flow file content...

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Joe 2015-09-22 15:04:37 +02:00 committed by Aldrin Piri
parent 26f80095b7
commit e12da7c9a3
1 changed files with 26 additions and 2 deletions

View File

@ -45,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.jetty.util.StringUtil;
@EventDriven
@SideEffectFree
@ -79,6 +80,14 @@ public class LogAttribute extends AbstractProcessor {
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder()
.name("Log prefix")
.required(false)
.description("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final String FIFTY_DASHES = "--------------------------------------------------";
public static enum DebugLevels {
@ -107,6 +116,7 @@ public class LogAttribute extends AbstractProcessor {
supDescriptors.add(LOG_PAYLOAD);
supDescriptors.add(ATTRIBUTES_TO_LOG_CSV);
supDescriptors.add(ATTRIBUTES_TO_IGNORE_CSV);
supDescriptors.add(LOG_PREFIX);
supportedDescriptors = Collections.unmodifiableList(supDescriptors);
}
@ -123,12 +133,26 @@ public class LogAttribute extends AbstractProcessor {
protected String processFlowFile(final ProcessorLog logger, final DebugLevels logLevel, final FlowFile flowFile, final ProcessSession session, final ProcessContext context) {
final Set<String> attributeKeys = getAttributesToLog(flowFile.getAttributes().keySet(), context);
final ProcessorLog LOG = getLogger();
final String dashedLine;
String logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtil.isBlank(logPrefix)) {
dashedLine = StringUtils.repeat('-', 50);
} else {
// abbreviate long lines
logPrefix = StringUtils.abbreviate(logPrefix, 40);
// center the logPrefix and pad with dashes
logPrefix = StringUtils.center(logPrefix, 40, '-');
// five dashes on the left and right side, plus the dashed logPrefix
dashedLine = StringUtils.repeat('-', 5) + logPrefix + StringUtils.repeat('-', 5);
}
// Pretty print metadata
final StringBuilder message = new StringBuilder();
message.append("logging for flow file ").append(flowFile);
message.append("\n");
message.append(FIFTY_DASHES);
message.append(dashedLine);
message.append("\nStandard FlowFile Attributes");
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "entryDate", new Date(flowFile.getEntryDate())));
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "lineageStartDate", new Date(flowFile.getLineageStartDate())));
@ -138,7 +162,7 @@ public class LogAttribute extends AbstractProcessor {
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", key, flowFile.getAttribute(key)));
}
message.append("\n");
message.append(FIFTY_DASHES);
message.append(dashedLine);
// The user can request to log the payload
final boolean logPayload = context.getProperty(LOG_PAYLOAD).asBoolean();