diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java index cd3d36ad6b..4b7023983c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java @@ -25,6 +25,7 @@ import java.io.InputStreamReader; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -37,6 +38,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -60,6 +63,13 @@ import org.apache.nifi.util.file.monitor.SynchronousFileWatcher; @Tags({"scan", "attributes", "search", "lookup"}) @CapabilityDescription("Scans the specified attributes of FlowFiles, checking to see if any of their values are " + "present within the specified dictionary of terms") +@WritesAttributes({ + @WritesAttribute(attribute = "dictionary.hit.{n}.attribute", description = "The attribute name that had a value hit on the dictionary file."), + @WritesAttribute(attribute = "dictionary.hit.{n}.term", description = "The term that had a hit on the dictionary file."), + @WritesAttribute(attribute = "dictionary.hit.{n}.metadata", description = "The metadata returned from the dictionary file associated with the term hit.") +}) + + public class ScanAttribute extends AbstractProcessor { public static final String MATCH_CRITERIA_ALL = "All Must Match"; @@ -97,13 +107,24 @@ public class ScanAttribute extends AbstractProcessor { .addValidator(StandardValidators.createRegexValidator(0, 1, false)) .defaultValue(null) .build(); - + + public static final PropertyDescriptor DICTIONARY_ENTRY_METADATA_DEMARCATOR = new PropertyDescriptor.Builder() + .name("Dictionary Entry Metadata Demarcator") + .description("A single character used to demarcate the dictionary entry string between dictionary value and metadata.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue(null) + .build(); + private List properties; private Set relationships; private volatile Pattern dictionaryFilterPattern = null; private volatile Pattern attributePattern = null; - private volatile Set dictionaryTerms = null; + private volatile String dictionaryEntryMetadataDemarcator = null; + private volatile Map dictionaryTerms = null; + private volatile Set attributeNameMatches = null; + private volatile SynchronousFileWatcher fileWatcher = null; public static final Relationship REL_MATCHED = new Relationship.Builder() @@ -122,6 +143,8 @@ public class ScanAttribute extends AbstractProcessor { properties.add(ATTRIBUTE_PATTERN); properties.add(MATCHING_CRITERIA); properties.add(DICTIONARY_FILTER); + properties.add(DICTIONARY_ENTRY_METADATA_DEMARCATOR); + this.properties = Collections.unmodifiableList(properties); final Set relationships = new HashSet<>(); @@ -150,11 +173,19 @@ public class ScanAttribute extends AbstractProcessor { this.dictionaryTerms = createDictionary(context); this.fileWatcher = new SynchronousFileWatcher(Paths.get(context.getProperty(DICTIONARY_FILE).getValue()), new LastModifiedMonitor(), 1000L); + + this.dictionaryEntryMetadataDemarcator = context.getProperty(DICTIONARY_ENTRY_METADATA_DEMARCATOR).getValue(); } - private Set createDictionary(final ProcessContext context) throws IOException { - final Set terms = new HashSet<>(); + private Map createDictionary(final ProcessContext context) throws IOException { + final Map termsMeta = new HashMap(); + this.dictionaryEntryMetadataDemarcator = context.getProperty(DICTIONARY_ENTRY_METADATA_DEMARCATOR).getValue(); + String[] termMeta; + String term; + String meta; + + final File file = new File(context.getProperty(DICTIONARY_FILE).getValue()); try (final InputStream fis = new FileInputStream(file); final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { @@ -164,10 +195,22 @@ public class ScanAttribute extends AbstractProcessor { if (line.trim().isEmpty()) { continue; } - - String matchingTerm = line; + + if(dictionaryEntryMetadataDemarcator != null && line.contains(dictionaryEntryMetadataDemarcator)) + { + termMeta = line.split(dictionaryEntryMetadataDemarcator); + term = termMeta[0]; + meta = termMeta[1]; + } + else + { + term=line; + meta=""; + } + + String matchingTerm = term; if (dictionaryFilterPattern != null) { - final Matcher matcher = dictionaryFilterPattern.matcher(line); + final Matcher matcher = dictionaryFilterPattern.matcher(term); if (!matcher.matches()) { continue; } @@ -177,20 +220,18 @@ public class ScanAttribute extends AbstractProcessor { if (matcher.groupCount() == 1) { matchingTerm = matcher.group(1); } else { - matchingTerm = line; + matchingTerm = term; } } - - terms.add(matchingTerm); + termsMeta.put(matchingTerm, meta); } } - - return Collections.unmodifiableSet(terms); + return Collections.unmodifiableMap(termsMeta); } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final List flowFiles = session.get(50); + List flowFiles = session.get(50); if (flowFiles.isEmpty()) { return; } @@ -206,36 +247,62 @@ public class ScanAttribute extends AbstractProcessor { final boolean matchAll = context.getProperty(MATCHING_CRITERIA).getValue().equals(MATCH_CRITERIA_ALL); - for (final FlowFile flowFile : flowFiles) { - final boolean matched = matchAll ? allMatch(flowFile, attributePattern, dictionaryTerms) : anyMatch(flowFile, attributePattern, dictionaryTerms); - final Relationship relationship = matched ? REL_MATCHED : REL_UNMATCHED; + for (FlowFile flowFile : flowFiles) { + final Map matched = (matchAll ? matchAll(flowFile, attributePattern, dictionaryTerms) : matchAny(flowFile, attributePattern, dictionaryTerms)); + flowFile = session.putAllAttributes(flowFile, matched); + + final Relationship relationship = (((matched.size() == (attributeNameMatches.size() * 3) && matchAll) || (matched.size() > 0 && !matchAll))) ? REL_MATCHED : REL_UNMATCHED; session.getProvenanceReporter().route(flowFile, relationship); session.transfer(flowFile, relationship); logger.info("Transferred {} to {}", new Object[]{flowFile, relationship}); } } - private boolean allMatch(final FlowFile flowFile, final Pattern attributePattern, final Set dictionary) { - for (final Map.Entry entry : flowFile.getAttributes().entrySet()) { - if (attributePattern == null || attributePattern.matcher(entry.getKey()).matches()) { - if (!dictionary.contains(entry.getValue())) { - return false; + private Map matchAny(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { + Map dictionaryTermMatches = new HashMap(); + attributeNameMatches = new HashSet(); + + int hitCounter = 0; + + for (final Map.Entry attribute : flowFile.getAttributes().entrySet()) { + if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) { + attributeNameMatches.add(attribute.getKey()); + + if (dictionary.containsKey(attribute.getValue())) { + hitCounter++; + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); } } } - - return true; + return dictionaryTermMatches; } + + private Map matchAll(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { + Map dictionaryTermMatches = new HashMap(); + attributeNameMatches = new HashSet(); - private boolean anyMatch(final FlowFile flowFile, final Pattern attributePattern, final Set dictionary) { - for (final Map.Entry entry : flowFile.getAttributes().entrySet()) { - if (attributePattern == null || attributePattern.matcher(entry.getKey()).matches()) { - if (dictionary.contains(entry.getValue())) { - return true; + int hitCounter = 0; + + for (final Map.Entry attribute : flowFile.getAttributes().entrySet()) { + if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) { + attributeNameMatches.add(attribute.getKey()); + + if (dictionary.containsKey(attribute.getValue())) { + hitCounter++; + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); + } + else + { + //if one attribute value is not found in the dictionary then no need to continue since this is a matchAll scenario. + dictionaryTermMatches.clear(); + break; } } } - - return false; + return dictionaryTermMatches; } }