NIFI-3497 - Added metadata option

Added optional to post additional metadata as new attributed if a match is found in the dictionary.
This commit is contained in:
Joe Trite 2017-02-22 16:36:08 -05:00
parent 5990db39ae
commit de7e348e62
1 changed files with 97 additions and 30 deletions

View File

@ -25,6 +25,7 @@ import java.io.InputStreamReader;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -37,6 +38,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -60,6 +63,13 @@ import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
@Tags({"scan", "attributes", "search", "lookup"})
@CapabilityDescription("Scans the specified attributes of FlowFiles, checking to see if any of their values are "
+ "present within the specified dictionary of terms")
@WritesAttributes({
@WritesAttribute(attribute = "dictionary.hit.{n}.attribute", description = "The attribute name that had a value hit on the dictionary file."),
@WritesAttribute(attribute = "dictionary.hit.{n}.term", description = "The term that had a hit on the dictionary file."),
@WritesAttribute(attribute = "dictionary.hit.{n}.metadata", description = "The metadata returned from the dictionary file associated with the term hit.")
})
public class ScanAttribute extends AbstractProcessor {
public static final String MATCH_CRITERIA_ALL = "All Must Match";
@ -97,13 +107,24 @@ public class ScanAttribute extends AbstractProcessor {
.addValidator(StandardValidators.createRegexValidator(0, 1, false))
.defaultValue(null)
.build();
public static final PropertyDescriptor DICTIONARY_ENTRY_METADATA_DEMARCATOR = new PropertyDescriptor.Builder()
.name("Dictionary Entry Metadata Demarcator")
.description("A single character used to demarcate the dictionary entry string between dictionary value and metadata.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue(null)
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private volatile Pattern dictionaryFilterPattern = null;
private volatile Pattern attributePattern = null;
private volatile Set<String> dictionaryTerms = null;
private volatile String dictionaryEntryMetadataDemarcator = null;
private volatile Map<String,String> dictionaryTerms = null;
private volatile Set<String> attributeNameMatches = null;
private volatile SynchronousFileWatcher fileWatcher = null;
public static final Relationship REL_MATCHED = new Relationship.Builder()
@ -122,6 +143,8 @@ public class ScanAttribute extends AbstractProcessor {
properties.add(ATTRIBUTE_PATTERN);
properties.add(MATCHING_CRITERIA);
properties.add(DICTIONARY_FILTER);
properties.add(DICTIONARY_ENTRY_METADATA_DEMARCATOR);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
@ -150,11 +173,19 @@ public class ScanAttribute extends AbstractProcessor {
this.dictionaryTerms = createDictionary(context);
this.fileWatcher = new SynchronousFileWatcher(Paths.get(context.getProperty(DICTIONARY_FILE).getValue()), new LastModifiedMonitor(), 1000L);
this.dictionaryEntryMetadataDemarcator = context.getProperty(DICTIONARY_ENTRY_METADATA_DEMARCATOR).getValue();
}
private Set<String> createDictionary(final ProcessContext context) throws IOException {
final Set<String> terms = new HashSet<>();
private Map<String,String> createDictionary(final ProcessContext context) throws IOException {
final Map<String,String> termsMeta = new HashMap<String, String>();
this.dictionaryEntryMetadataDemarcator = context.getProperty(DICTIONARY_ENTRY_METADATA_DEMARCATOR).getValue();
String[] termMeta;
String term;
String meta;
final File file = new File(context.getProperty(DICTIONARY_FILE).getValue());
try (final InputStream fis = new FileInputStream(file);
final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
@ -164,10 +195,22 @@ public class ScanAttribute extends AbstractProcessor {
if (line.trim().isEmpty()) {
continue;
}
String matchingTerm = line;
if(dictionaryEntryMetadataDemarcator != null && line.contains(dictionaryEntryMetadataDemarcator))
{
termMeta = line.split(dictionaryEntryMetadataDemarcator);
term = termMeta[0];
meta = termMeta[1];
}
else
{
term=line;
meta="";
}
String matchingTerm = term;
if (dictionaryFilterPattern != null) {
final Matcher matcher = dictionaryFilterPattern.matcher(line);
final Matcher matcher = dictionaryFilterPattern.matcher(term);
if (!matcher.matches()) {
continue;
}
@ -177,20 +220,18 @@ public class ScanAttribute extends AbstractProcessor {
if (matcher.groupCount() == 1) {
matchingTerm = matcher.group(1);
} else {
matchingTerm = line;
matchingTerm = term;
}
}
terms.add(matchingTerm);
termsMeta.put(matchingTerm, meta);
}
}
return Collections.unmodifiableSet(terms);
return Collections.unmodifiableMap(termsMeta);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final List<FlowFile> flowFiles = session.get(50);
List<FlowFile> flowFiles = session.get(50);
if (flowFiles.isEmpty()) {
return;
}
@ -206,36 +247,62 @@ public class ScanAttribute extends AbstractProcessor {
final boolean matchAll = context.getProperty(MATCHING_CRITERIA).getValue().equals(MATCH_CRITERIA_ALL);
for (final FlowFile flowFile : flowFiles) {
final boolean matched = matchAll ? allMatch(flowFile, attributePattern, dictionaryTerms) : anyMatch(flowFile, attributePattern, dictionaryTerms);
final Relationship relationship = matched ? REL_MATCHED : REL_UNMATCHED;
for (FlowFile flowFile : flowFiles) {
final Map<String,String> matched = (matchAll ? matchAll(flowFile, attributePattern, dictionaryTerms) : matchAny(flowFile, attributePattern, dictionaryTerms));
flowFile = session.putAllAttributes(flowFile, matched);
final Relationship relationship = (((matched.size() == (attributeNameMatches.size() * 3) && matchAll) || (matched.size() > 0 && !matchAll))) ? REL_MATCHED : REL_UNMATCHED;
session.getProvenanceReporter().route(flowFile, relationship);
session.transfer(flowFile, relationship);
logger.info("Transferred {} to {}", new Object[]{flowFile, relationship});
}
}
private boolean allMatch(final FlowFile flowFile, final Pattern attributePattern, final Set<String> dictionary) {
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
if (attributePattern == null || attributePattern.matcher(entry.getKey()).matches()) {
if (!dictionary.contains(entry.getValue())) {
return false;
private Map<String,String> matchAny(final FlowFile flowFile, final Pattern attributePattern, final Map<String,String> dictionary) {
Map<String,String> dictionaryTermMatches = new HashMap<String,String>();
attributeNameMatches = new HashSet<String>();
int hitCounter = 0;
for (final Map.Entry<String, String> attribute : flowFile.getAttributes().entrySet()) {
if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) {
attributeNameMatches.add(attribute.getKey());
if (dictionary.containsKey(attribute.getValue())) {
hitCounter++;
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey());
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue());
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue()));
}
}
}
return true;
return dictionaryTermMatches;
}
private Map<String,String> matchAll(final FlowFile flowFile, final Pattern attributePattern, final Map<String,String> dictionary) {
Map<String,String> dictionaryTermMatches = new HashMap<String,String>();
attributeNameMatches = new HashSet<String>();
private boolean anyMatch(final FlowFile flowFile, final Pattern attributePattern, final Set<String> dictionary) {
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
if (attributePattern == null || attributePattern.matcher(entry.getKey()).matches()) {
if (dictionary.contains(entry.getValue())) {
return true;
int hitCounter = 0;
for (final Map.Entry<String, String> attribute : flowFile.getAttributes().entrySet()) {
if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) {
attributeNameMatches.add(attribute.getKey());
if (dictionary.containsKey(attribute.getValue())) {
hitCounter++;
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey());
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue());
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue()));
}
else
{
//if one attribute value is not found in the dictionary then no need to continue since this is a matchAll scenario.
dictionaryTermMatches.clear();
break;
}
}
}
return false;
return dictionaryTermMatches;
}
}