mirror of https://github.com/apache/nifi.git
NIFI-2169: This closes #2343. Cache compiled regexp for RouteText
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
353fcdda9c
commit
2fbe922a2b
|
@ -31,11 +31,15 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.DynamicRelationship;
|
import org.apache.nifi.annotation.behavior.DynamicRelationship;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
@ -209,6 +213,24 @@ public class RouteText extends AbstractProcessor {
|
||||||
private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
|
private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
|
||||||
private volatile Pattern groupingRegex = null;
|
private volatile Pattern groupingRegex = null;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 1024;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* LRU cache for the compiled patterns. The size of the cache is determined by the value of
|
||||||
|
* {@link #PATTERNS_CACHE_MAXIMUM_ENTRIES}.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
final ConcurrentMap<String, Pattern> patternsCache = CacheBuilder.newBuilder()
|
||||||
|
.maximumSize(PATTERNS_CACHE_MAXIMUM_ENTRIES)
|
||||||
|
.<String, Pattern>build()
|
||||||
|
.asMap();
|
||||||
|
|
||||||
|
private Pattern cachedCompiledPattern(final String regex, final boolean ignoreCase) {
|
||||||
|
return patternsCache.computeIfAbsent(regex,
|
||||||
|
r -> ignoreCase ? Pattern.compile(r, Pattern.CASE_INSENSITIVE) : Pattern.compile(r));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void init(final ProcessorInitializationContext context) {
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
final Set<Relationship> set = new HashSet<>();
|
final Set<Relationship> set = new HashSet<>();
|
||||||
|
@ -249,6 +271,10 @@ public class RouteText extends AbstractProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||||
|
if (descriptor.equals(IGNORE_CASE) && !newValue.equals(oldValue)) {
|
||||||
|
patternsCache.clear();
|
||||||
|
}
|
||||||
|
|
||||||
if (descriptor.equals(ROUTE_STRATEGY)) {
|
if (descriptor.equals(ROUTE_STRATEGY)) {
|
||||||
configuredRouteStrategy = newValue;
|
configuredRouteStrategy = newValue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -384,11 +410,7 @@ public class RouteText extends AbstractProcessor {
|
||||||
for (final Map.Entry<Relationship, PropertyValue> entry : propMap.entrySet()) {
|
for (final Map.Entry<Relationship, PropertyValue> entry : propMap.entrySet()) {
|
||||||
final String value = entry.getValue().evaluateAttributeExpressions(originalFlowFile).getValue();
|
final String value = entry.getValue().evaluateAttributeExpressions(originalFlowFile).getValue();
|
||||||
|
|
||||||
Pattern compiledRegex = null;
|
propValueMap.put(entry.getKey(), compileRegex ? cachedCompiledPattern(value, ignoreCase) : value);
|
||||||
if (compileRegex) {
|
|
||||||
compiledRegex = ignoreCase ? Pattern.compile(value, Pattern.CASE_INSENSITIVE) : Pattern.compile(value);
|
|
||||||
}
|
|
||||||
propValueMap.put(entry.getKey(), compileRegex ? compiledRegex : value);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -435,7 +457,7 @@ public class RouteText extends AbstractProcessor {
|
||||||
|
|
||||||
int propertiesThatMatchedLine = 0;
|
int propertiesThatMatchedLine = 0;
|
||||||
for (final Map.Entry<Relationship, Object> entry : propValueMap.entrySet()) {
|
for (final Map.Entry<Relationship, Object> entry : propValueMap.entrySet()) {
|
||||||
boolean lineMatchesProperty = lineMatches(matchLine, entry.getValue(), context.getProperty(MATCH_STRATEGY).getValue(), ignoreCase, originalFlowFile, variables);
|
boolean lineMatchesProperty = lineMatches(matchLine, entry.getValue(), matchStrategy, ignoreCase, originalFlowFile, variables);
|
||||||
if (lineMatchesProperty) {
|
if (lineMatchesProperty) {
|
||||||
propertiesThatMatchedLine++;
|
propertiesThatMatchedLine++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
@ -763,6 +764,40 @@ public class TestRouteText {
|
||||||
outOriginal.assertContentEquals(Paths.get("src/test/resources/TestXml/XmlBundle.xsd"));
|
outOriginal.assertContentEquals(Paths.get("src/test/resources/TestXml/XmlBundle.xsd"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPatternCache() throws IOException {
|
||||||
|
final RouteText routeText = new RouteText();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(routeText);
|
||||||
|
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.MATCHES_REGULAR_EXPRESSION);
|
||||||
|
runner.setProperty("simple", ".*(${someValue}).*");
|
||||||
|
|
||||||
|
runner.enqueue("some text", ImmutableMap.of("someValue", "a value"));
|
||||||
|
runner.enqueue("some other text", ImmutableMap.of("someValue", "a value"));
|
||||||
|
runner.run(2);
|
||||||
|
|
||||||
|
assertEquals("Expected 1 elements in the cache for the patterns, got" +
|
||||||
|
routeText.patternsCache.size(), 1, routeText.patternsCache.size());
|
||||||
|
|
||||||
|
for (int i = 0; i < RouteText.PATTERNS_CACHE_MAXIMUM_ENTRIES * 2; ++i) {
|
||||||
|
String iString = Long.toString(i);
|
||||||
|
runner.enqueue("some text with " + iString + "in it",
|
||||||
|
ImmutableMap.of("someValue", iString));
|
||||||
|
runner.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals("Expected " + RouteText.PATTERNS_CACHE_MAXIMUM_ENTRIES +
|
||||||
|
" elements in the cache for the patterns, got" + routeText.patternsCache.size(),
|
||||||
|
RouteText.PATTERNS_CACHE_MAXIMUM_ENTRIES, routeText.patternsCache.size());
|
||||||
|
|
||||||
|
runner.assertTransferCount("simple", RouteText.PATTERNS_CACHE_MAXIMUM_ENTRIES * 2);
|
||||||
|
runner.assertTransferCount("unmatched", 2);
|
||||||
|
runner.assertTransferCount("original", RouteText.PATTERNS_CACHE_MAXIMUM_ENTRIES * 2 + 2);
|
||||||
|
|
||||||
|
runner.setProperty(RouteText.IGNORE_CASE, "true");
|
||||||
|
assertEquals("Pattern cache is not cleared after changing IGNORE_CASE", 0, routeText.patternsCache.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static int countLines(String str) {
|
public static int countLines(String str) {
|
||||||
if (str == null || str.isEmpty()) {
|
if (str == null || str.isEmpty()) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue