NIFI-552: added regex properties for include and ignore filters in LogAttribute

This closes #1981

Signed-off-by: Tony Kurc <tkurc@apache.org>
This commit is contained in:
m-hogue 2017-07-05 17:10:58 -04:00 committed by Tony Kurc
parent 47eece5798
commit e6b166a3a2
2 changed files with 297 additions and 15 deletions

View File

@ -20,13 +20,14 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -49,6 +50,8 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import com.google.common.collect.Sets;
@EventDriven @EventDriven
@SideEffectFree @SideEffectFree
@SupportsBatching @SupportsBatching
@ -66,15 +69,34 @@ public class LogAttribute extends AbstractProcessor {
public static final PropertyDescriptor ATTRIBUTES_TO_LOG_CSV = new PropertyDescriptor.Builder() public static final PropertyDescriptor ATTRIBUTES_TO_LOG_CSV = new PropertyDescriptor.Builder()
.name("Attributes to Log") .name("Attributes to Log")
.required(false) .required(false)
.description("A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.") .description("A comma-separated list of Attributes to Log. If not specified, all attributes will be logged unless `Attributes to Log by Regular Expression` is modified." +
" There's an AND relationship between the two properties.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor ATTRIBUTES_TO_LOG_REGEX = new PropertyDescriptor.Builder()
.name("attributes-to-log-regex")
.displayName("Attributes to Log by Regular Expression")
.required(false)
.defaultValue(".*")
.description("A regular expression indicating the Attributes to Log. If not specified, all attributes will be logged unless `Attributes to Log` is modified." +
" There's an AND relationship between the two properties.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor ATTRIBUTES_TO_IGNORE_CSV = new PropertyDescriptor.Builder() public static final PropertyDescriptor ATTRIBUTES_TO_IGNORE_CSV = new PropertyDescriptor.Builder()
.name("Attributes to Ignore") .name("Attributes to Ignore")
.description("A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.") .description("A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored unless `Attributes to Ignore by Regular Expression` is modified." +
" There's an OR relationship between the two properties.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor ATTRIBUTES_TO_IGNORE_REGEX = new PropertyDescriptor.Builder()
.name("attributes-to-ignore-regex")
.displayName("Attributes to Ignore by Regular Expression")
.required(false)
.description("A regular expression indicating the Attributes to Ignore. If not specified, no attributes will be ignored unless `Attributes to Ignore` is modified." +
" There's an OR relationship between the two properties.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor LOG_PAYLOAD = new PropertyDescriptor.Builder() public static final PropertyDescriptor LOG_PAYLOAD = new PropertyDescriptor.Builder()
.name("Log Payload") .name("Log Payload")
.required(true) .required(true)
@ -127,7 +149,9 @@ public class LogAttribute extends AbstractProcessor {
supDescriptors.add(LOG_LEVEL); supDescriptors.add(LOG_LEVEL);
supDescriptors.add(LOG_PAYLOAD); supDescriptors.add(LOG_PAYLOAD);
supDescriptors.add(ATTRIBUTES_TO_LOG_CSV); supDescriptors.add(ATTRIBUTES_TO_LOG_CSV);
supDescriptors.add(ATTRIBUTES_TO_LOG_REGEX);
supDescriptors.add(ATTRIBUTES_TO_IGNORE_CSV); supDescriptors.add(ATTRIBUTES_TO_IGNORE_CSV);
supDescriptors.add(ATTRIBUTES_TO_IGNORE_REGEX);
supDescriptors.add(LOG_PREFIX); supDescriptors.add(LOG_PREFIX);
supDescriptors.add(CHARSET); supDescriptors.add(CHARSET);
supportedDescriptors = Collections.unmodifiableList(supDescriptors); supportedDescriptors = Collections.unmodifiableList(supDescriptors);
@ -217,21 +241,27 @@ public class LogAttribute extends AbstractProcessor {
} }
private Set<String> getAttributesToLog(final Set<String> flowFileAttrKeys, final ProcessContext context) { private Set<String> getAttributesToLog(final Set<String> flowFileAttrKeys, final ProcessContext context) {
final Set<String> result = new TreeSet<>();
// collect properties
final String attrsToLogValue = context.getProperty(ATTRIBUTES_TO_LOG_CSV).getValue(); final String attrsToLogValue = context.getProperty(ATTRIBUTES_TO_LOG_CSV).getValue();
if (StringUtils.isBlank(attrsToLogValue)) {
result.addAll(flowFileAttrKeys);
} else {
result.addAll(Arrays.asList(attrsToLogValue.split("\\s*,\\s*")));
}
final String attrsToRemoveValue = context.getProperty(ATTRIBUTES_TO_IGNORE_CSV).getValue(); final String attrsToRemoveValue = context.getProperty(ATTRIBUTES_TO_IGNORE_CSV).getValue();
if (StringUtils.isNotBlank(attrsToRemoveValue)) { final Set<String> attrsToLog = StringUtils.isBlank(attrsToLogValue) ? Sets.newHashSet(flowFileAttrKeys) : Sets.newHashSet(attrsToLogValue.split("\\s*,\\s*"));
result.removeAll(Arrays.asList(attrsToRemoveValue.split("\\s*,\\s*"))); final Set<String> attrsToRemove = StringUtils.isBlank(attrsToRemoveValue) ? Sets.newHashSet() : Sets.newHashSet(attrsToRemoveValue.split("\\s*,\\s*"));
final Pattern attrsToLogRegex = Pattern.compile(context.getProperty(ATTRIBUTES_TO_LOG_REGEX).getValue());
final String attrsToRemoveRegexValue = context.getProperty(ATTRIBUTES_TO_IGNORE_REGEX).getValue();
final Pattern attrsToRemoveRegex = attrsToRemoveRegexValue == null ? null : Pattern.compile(context.getProperty(ATTRIBUTES_TO_IGNORE_REGEX).getValue());
return flowFileAttrKeys.stream()
.filter(candidate -> {
// we'll consider logging an attribute if either no explicit attributes to log were configured,
// if this property was configured to be logged, or if the regular expression of properties to log matches
if ((attrsToLog.isEmpty() || attrsToLog.contains(candidate)) && attrsToLogRegex.matcher(candidate).matches()) {
// log properties we've _not_ configured either explicitly or by regular expression to be ignored.
if ((attrsToRemove.isEmpty() || !attrsToRemove.contains(candidate)) && (attrsToRemoveRegex == null || !attrsToRemoveRegex.matcher(candidate).matches())) {
return true;
} }
}
return result; return false;
}).collect(Collectors.toCollection(TreeSet::new));
} }
private void transferChunk(final ProcessSession session) { private void transferChunk(final ProcessSession session) {

View File

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNot.not;
import java.util.Map;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import com.google.common.collect.Maps;
public class TestLogAttribute {
@Test
public void testLogPropertyCSVNoIgnore() {
final LogAttribute logAttribute = new LogAttribute();
final TestRunner runner = TestRunners.newTestRunner(logAttribute);
final ProcessContext context = runner.getProcessContext();
final ProcessSession session = runner.getProcessSessionFactory().createSession();
final MockComponentLog LOG = runner.getLogger();
runner.setProperty(LogAttribute.ATTRIBUTES_TO_LOG_CSV, "foo, bar");
final Map<String,String> attrs = Maps.newHashMap();
attrs.put("foo", "foo-value");
attrs.put("bar", "bar-value");
attrs.put("foobaz", "foobaz-value");
final MockFlowFile flowFile = runner.enqueue("content", attrs);
final String logMessage = logAttribute.processFlowFile(LOG, LogAttribute.DebugLevels.info, flowFile, session, context);
assertThat(logMessage, not(containsString("foobaz-value")));
assertThat(logMessage, containsString("foo-value"));
assertThat(logMessage, containsString("bar-value"));
}
@Test
public void testLogPropertyRegexNoIgnore() {
final LogAttribute logAttribute = new LogAttribute();
final TestRunner runner = TestRunners.newTestRunner(logAttribute);
final ProcessContext context = runner.getProcessContext();
final ProcessSession session = runner.getProcessSessionFactory().createSession();
final MockComponentLog LOG = runner.getLogger();
runner.setProperty(LogAttribute.ATTRIBUTES_TO_LOG_REGEX, "foo.*");
final Map<String,String> attrs = Maps.newHashMap();
attrs.put("foo", "foo-value");
attrs.put("bar", "bar-value");
attrs.put("foobaz", "foobaz-value");
final MockFlowFile flowFile = runner.enqueue("content", attrs);
final String logMessage = logAttribute.processFlowFile(LOG, LogAttribute.DebugLevels.info, flowFile, session, context);
assertThat(logMessage, containsString("foobaz-value"));
assertThat(logMessage, containsString("foo-value"));
assertThat(logMessage, not(containsString("bar-value")));
}
@Test
public void testLogPropertyWithCSVAndRegexNoIgnore() {
final LogAttribute logAttribute = new LogAttribute();
final TestRunner runner = TestRunners.newTestRunner(logAttribute);
final ProcessContext context = runner.getProcessContext();
final ProcessSession session = runner.getProcessSessionFactory().createSession();
final MockComponentLog LOG = runner.getLogger();
// there's an AND relationship between like properties, so only foo should be logged in this case
runner.setProperty(LogAttribute.ATTRIBUTES_TO_LOG_CSV, "foo, bar");
runner.setProperty(LogAttribute.ATTRIBUTES_TO_LOG_REGEX, "foo*");
final Map<String,String> attrs = Maps.newHashMap();
attrs.put("foo", "foo-value");
attrs.put("bar", "bar-value");
attrs.put("foobaz", "foobaz-value");
final MockFlowFile flowFile = runner.enqueue("content", attrs);
final String logMessage = logAttribute.processFlowFile(LOG, LogAttribute.DebugLevels.info, flowFile, session, context);
assertThat(logMessage, not(containsString("foobaz-value")));
assertThat(logMessage, containsString("foo-value"));
assertThat(logMessage, not(containsString("bar-value")));
}
@Test
public void testLogPropertyWithIgnoreCSV() {
final LogAttribute logAttribute = new LogAttribute();
final TestRunner runner = TestRunners.newTestRunner(logAttribute);
final ProcessContext context = runner.getProcessContext();
final ProcessSession session = runner.getProcessSessionFactory().createSession();
final MockComponentLog LOG = runner.getLogger();
runner.setProperty(LogAttribute.ATTRIBUTES_TO_IGNORE_CSV, "bar");
final Map<String,String> attrs = Maps.newHashMap();
attrs.put("foo", "foo-value");
attrs.put("bar", "bar-value");
attrs.put("foobaz", "foobaz-value");
final MockFlowFile flowFile = runner.enqueue("content", attrs);
final String logMessage = logAttribute.processFlowFile(LOG, LogAttribute.DebugLevels.info, flowFile, session, context);
assertThat(logMessage, containsString("foobaz-value"));
assertThat(logMessage, containsString("foo-value"));
assertThat(logMessage, not(containsString("bar-value")));
}
@Test
public void testLogPropertyWithIgnoreRegex() {
final LogAttribute logAttribute = new LogAttribute();
final TestRunner runner = TestRunners.newTestRunner(logAttribute);
final ProcessContext context = runner.getProcessContext();
final ProcessSession session = runner.getProcessSessionFactory().createSession();
final MockComponentLog LOG = runner.getLogger();
runner.setProperty(LogAttribute.ATTRIBUTES_TO_IGNORE_REGEX, "foo.*");
final Map<String,String> attrs = Maps.newHashMap();
attrs.put("foo", "foo-value");
attrs.put("bar", "bar-value");
attrs.put("foobaz", "foobaz-value");
final MockFlowFile flowFile = runner.enqueue("content", attrs);
final String logMessage = logAttribute.processFlowFile(LOG, LogAttribute.DebugLevels.info, flowFile, session, context);
assertThat(logMessage, not(containsString("foobaz-value")));
assertThat(logMessage, not(containsString("foo-value")));
assertThat(logMessage, containsString("bar-value"));
}
@Test
public void testLogPropertyWithIgnoreCSVAndRegex() {
final LogAttribute logAttribute = new LogAttribute();
final TestRunner runner = TestRunners.newTestRunner(logAttribute);
final ProcessContext context = runner.getProcessContext();
final ProcessSession session = runner.getProcessSessionFactory().createSession();
final MockComponentLog LOG = runner.getLogger();
// there's an OR relationship between like properties, so anything starting with foo or bar are removed. that's everything we're adding
runner.setProperty(LogAttribute.ATTRIBUTES_TO_IGNORE_CSV, "foo,bar");
runner.setProperty(LogAttribute.ATTRIBUTES_TO_IGNORE_REGEX, "foo.*");
final Map<String,String> attrs = Maps.newHashMap();
attrs.put("foo", "foo-value");
attrs.put("bar", "bar-value");
attrs.put("foobaz", "foobaz-value");
final MockFlowFile flowFile = runner.enqueue("content", attrs);
final String logMessage = logAttribute.processFlowFile(LOG, LogAttribute.DebugLevels.info, flowFile, session, context);
assertThat(logMessage, not(containsString("foobaz-value")));
assertThat(logMessage, not(containsString("foo-value")));
assertThat(logMessage, not(containsString("bar-value")));
}
@Test
public void testLogPropertyCSVWithIgnoreRegex() {
final LogAttribute logAttribute = new LogAttribute();
final TestRunner runner = TestRunners.newTestRunner(logAttribute);
final ProcessContext context = runner.getProcessContext();
final ProcessSession session = runner.getProcessSessionFactory().createSession();
final MockComponentLog LOG = runner.getLogger();
// we're saying add and remove the same properties, so the net result should be nothing
runner.setProperty(LogAttribute.ATTRIBUTES_TO_LOG_CSV, "foo");
runner.setProperty(LogAttribute.ATTRIBUTES_TO_IGNORE_REGEX, "foo.*");
final Map<String,String> attrs = Maps.newHashMap();
attrs.put("foo", "foo-value");
attrs.put("bar", "bar-value");
attrs.put("foobaz", "foobaz-value");
final MockFlowFile flowFile = runner.enqueue("content", attrs);
final String logMessage = logAttribute.processFlowFile(LOG, LogAttribute.DebugLevels.info, flowFile, session, context);
assertThat(logMessage, not(containsString("foobaz-value")));
assertThat(logMessage, not(containsString("foo-value")));
assertThat(logMessage, not(containsString("bar-value")));
}
@Test
public void testLogPropertyCSVWithIgnoreCSV() {
final LogAttribute logAttribute = new LogAttribute();
final TestRunner runner = TestRunners.newTestRunner(logAttribute);
final ProcessContext context = runner.getProcessContext();
final ProcessSession session = runner.getProcessSessionFactory().createSession();
final MockComponentLog LOG = runner.getLogger();
// add foo,foobaz and remove foobaz
runner.setProperty(LogAttribute.ATTRIBUTES_TO_LOG_CSV, "foo,foobaz");
runner.setProperty(LogAttribute.ATTRIBUTES_TO_IGNORE_CSV, "foobaz");
final Map<String,String> attrs = Maps.newHashMap();
attrs.put("foo", "foo-value");
attrs.put("bar", "bar-value");
attrs.put("foobaz", "foobaz-value");
final MockFlowFile flowFile = runner.enqueue("content", attrs);
final String logMessage = logAttribute.processFlowFile(LOG, LogAttribute.DebugLevels.info, flowFile, session, context);
assertThat(logMessage, not(containsString("foobaz-value")));
assertThat(logMessage, containsString("foo-value"));
assertThat(logMessage, not(containsString("bar-value")));
}
@Test
public void testLogPropertyRegexWithIgnoreRegex() {
final LogAttribute logAttribute = new LogAttribute();
final TestRunner runner = TestRunners.newTestRunner(logAttribute);
final ProcessContext context = runner.getProcessContext();
final ProcessSession session = runner.getProcessSessionFactory().createSession();
final MockComponentLog LOG = runner.getLogger();
runner.setProperty(LogAttribute.ATTRIBUTES_TO_LOG_REGEX, "foo.*"); // includes foo,foobaz
runner.setProperty(LogAttribute.ATTRIBUTES_TO_IGNORE_REGEX, "foobaz.*"); // includes foobaz
final Map<String,String> attrs = Maps.newHashMap();
attrs.put("foo", "foo-value");
attrs.put("bar", "bar-value");
attrs.put("foobaz", "foobaz-value");
final MockFlowFile flowFile = runner.enqueue("content", attrs);
final String logMessage = logAttribute.processFlowFile(LOG, LogAttribute.DebugLevels.info, flowFile, session, context);
assertThat(logMessage, not(containsString("foobaz-value")));
assertThat(logMessage, containsString("foo-value"));
assertThat(logMessage, not(containsString("bar-value")));
}
}