From f52e1f2a064b31f87d4165af6075716fa7d55046 Mon Sep 17 00:00:00 2001 From: Joe Trite Date: Wed, 22 Feb 2017 16:36:08 -0500 Subject: [PATCH 1/5] NIFI-3497 - Added metadata option Added optional to post additional metadata as new attributed if a match is found in the dictionary. --- .../processors/standard/ScanAttribute.java | 127 +++++++++++++----- 1 file changed, 97 insertions(+), 30 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java index cd3d36ad6b..4b7023983c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java @@ -25,6 +25,7 @@ import java.io.InputStreamReader; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -37,6 +38,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -60,6 +63,13 @@ import org.apache.nifi.util.file.monitor.SynchronousFileWatcher; @Tags({"scan", "attributes", "search", "lookup"}) @CapabilityDescription("Scans the specified attributes of FlowFiles, checking to see if any of their values are " + "present within the specified dictionary of terms") +@WritesAttributes({ + @WritesAttribute(attribute = "dictionary.hit.{n}.attribute", description = "The attribute name that had a value hit on the dictionary file."), + @WritesAttribute(attribute = "dictionary.hit.{n}.term", description = "The term that had a hit on the dictionary file."), + @WritesAttribute(attribute = "dictionary.hit.{n}.metadata", description = "The metadata returned from the dictionary file associated with the term hit.") +}) + + public class ScanAttribute extends AbstractProcessor { public static final String MATCH_CRITERIA_ALL = "All Must Match"; @@ -97,13 +107,24 @@ public class ScanAttribute extends AbstractProcessor { .addValidator(StandardValidators.createRegexValidator(0, 1, false)) .defaultValue(null) .build(); - + + public static final PropertyDescriptor DICTIONARY_ENTRY_METADATA_DEMARCATOR = new PropertyDescriptor.Builder() + .name("Dictionary Entry Metadata Demarcator") + .description("A single character used to demarcate the dictionary entry string between dictionary value and metadata.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue(null) + .build(); + private List properties; private Set relationships; private volatile Pattern dictionaryFilterPattern = null; private volatile Pattern attributePattern = null; - private volatile Set dictionaryTerms = null; + private volatile String dictionaryEntryMetadataDemarcator = null; + private volatile Map dictionaryTerms = null; + private volatile Set attributeNameMatches = null; + private volatile SynchronousFileWatcher fileWatcher = null; public static final Relationship REL_MATCHED = new Relationship.Builder() @@ -122,6 +143,8 @@ public class ScanAttribute extends AbstractProcessor { properties.add(ATTRIBUTE_PATTERN); properties.add(MATCHING_CRITERIA); properties.add(DICTIONARY_FILTER); + properties.add(DICTIONARY_ENTRY_METADATA_DEMARCATOR); + this.properties = Collections.unmodifiableList(properties); final Set relationships = new HashSet<>(); @@ -150,11 +173,19 @@ public class ScanAttribute extends AbstractProcessor { this.dictionaryTerms = createDictionary(context); this.fileWatcher = new SynchronousFileWatcher(Paths.get(context.getProperty(DICTIONARY_FILE).getValue()), new LastModifiedMonitor(), 1000L); + + this.dictionaryEntryMetadataDemarcator = context.getProperty(DICTIONARY_ENTRY_METADATA_DEMARCATOR).getValue(); } - private Set createDictionary(final ProcessContext context) throws IOException { - final Set terms = new HashSet<>(); + private Map createDictionary(final ProcessContext context) throws IOException { + final Map termsMeta = new HashMap(); + this.dictionaryEntryMetadataDemarcator = context.getProperty(DICTIONARY_ENTRY_METADATA_DEMARCATOR).getValue(); + String[] termMeta; + String term; + String meta; + + final File file = new File(context.getProperty(DICTIONARY_FILE).getValue()); try (final InputStream fis = new FileInputStream(file); final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { @@ -164,10 +195,22 @@ public class ScanAttribute extends AbstractProcessor { if (line.trim().isEmpty()) { continue; } - - String matchingTerm = line; + + if(dictionaryEntryMetadataDemarcator != null && line.contains(dictionaryEntryMetadataDemarcator)) + { + termMeta = line.split(dictionaryEntryMetadataDemarcator); + term = termMeta[0]; + meta = termMeta[1]; + } + else + { + term=line; + meta=""; + } + + String matchingTerm = term; if (dictionaryFilterPattern != null) { - final Matcher matcher = dictionaryFilterPattern.matcher(line); + final Matcher matcher = dictionaryFilterPattern.matcher(term); if (!matcher.matches()) { continue; } @@ -177,20 +220,18 @@ public class ScanAttribute extends AbstractProcessor { if (matcher.groupCount() == 1) { matchingTerm = matcher.group(1); } else { - matchingTerm = line; + matchingTerm = term; } } - - terms.add(matchingTerm); + termsMeta.put(matchingTerm, meta); } } - - return Collections.unmodifiableSet(terms); + return Collections.unmodifiableMap(termsMeta); } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final List flowFiles = session.get(50); + List flowFiles = session.get(50); if (flowFiles.isEmpty()) { return; } @@ -206,36 +247,62 @@ public class ScanAttribute extends AbstractProcessor { final boolean matchAll = context.getProperty(MATCHING_CRITERIA).getValue().equals(MATCH_CRITERIA_ALL); - for (final FlowFile flowFile : flowFiles) { - final boolean matched = matchAll ? allMatch(flowFile, attributePattern, dictionaryTerms) : anyMatch(flowFile, attributePattern, dictionaryTerms); - final Relationship relationship = matched ? REL_MATCHED : REL_UNMATCHED; + for (FlowFile flowFile : flowFiles) { + final Map matched = (matchAll ? matchAll(flowFile, attributePattern, dictionaryTerms) : matchAny(flowFile, attributePattern, dictionaryTerms)); + flowFile = session.putAllAttributes(flowFile, matched); + + final Relationship relationship = (((matched.size() == (attributeNameMatches.size() * 3) && matchAll) || (matched.size() > 0 && !matchAll))) ? REL_MATCHED : REL_UNMATCHED; session.getProvenanceReporter().route(flowFile, relationship); session.transfer(flowFile, relationship); logger.info("Transferred {} to {}", new Object[]{flowFile, relationship}); } } - private boolean allMatch(final FlowFile flowFile, final Pattern attributePattern, final Set dictionary) { - for (final Map.Entry entry : flowFile.getAttributes().entrySet()) { - if (attributePattern == null || attributePattern.matcher(entry.getKey()).matches()) { - if (!dictionary.contains(entry.getValue())) { - return false; + private Map matchAny(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { + Map dictionaryTermMatches = new HashMap(); + attributeNameMatches = new HashSet(); + + int hitCounter = 0; + + for (final Map.Entry attribute : flowFile.getAttributes().entrySet()) { + if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) { + attributeNameMatches.add(attribute.getKey()); + + if (dictionary.containsKey(attribute.getValue())) { + hitCounter++; + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); } } } - - return true; + return dictionaryTermMatches; } + + private Map matchAll(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { + Map dictionaryTermMatches = new HashMap(); + attributeNameMatches = new HashSet(); - private boolean anyMatch(final FlowFile flowFile, final Pattern attributePattern, final Set dictionary) { - for (final Map.Entry entry : flowFile.getAttributes().entrySet()) { - if (attributePattern == null || attributePattern.matcher(entry.getKey()).matches()) { - if (dictionary.contains(entry.getValue())) { - return true; + int hitCounter = 0; + + for (final Map.Entry attribute : flowFile.getAttributes().entrySet()) { + if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) { + attributeNameMatches.add(attribute.getKey()); + + if (dictionary.containsKey(attribute.getValue())) { + hitCounter++; + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); + } + else + { + //if one attribute value is not found in the dictionary then no need to continue since this is a matchAll scenario. + dictionaryTermMatches.clear(); + break; } } } - - return false; + return dictionaryTermMatches; } } From 8eb54a50193897cf564eb7d222aae35481168af4 Mon Sep 17 00:00:00 2001 From: Joe Trite Date: Wed, 22 Feb 2017 16:46:13 -0500 Subject: [PATCH 2/5] NIFI-3497 test cases for metadata updates Adding test cases to support metadata option update. --- .../standard/TestScanAttribute.java | 188 ++++++++++++++++++ 1 file changed, 188 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java index b4a41369f2..53aecf357c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java @@ -16,9 +16,12 @@ */ package org.apache.nifi.processors.standard; +import static org.junit.Assert.*; + import java.util.HashMap; import java.util.Map; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -145,4 +148,189 @@ public class TestScanAttribute { runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); runner.clearTransferState(); } + + @Test + public void testSingleMatchWithMeta() { + final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); + FlowFile f; + + runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary1_meta"); + runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); + + final Map attributes = new HashMap<>(); + attributes.put("produce_name", "banana"); + + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); + f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); + + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); + + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); + assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); + + runner.clearTransferState(); + + attributes.remove("produce_name"); + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_UNMATCHED, 1); + runner.clearTransferState(); + + attributes.put("produce_name", "cherry"); + runner.setProperty(ScanAttribute.ATTRIBUTE_PATTERN, "pro.*"); + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); + f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); + + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); + + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"cherry"); + assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "red fruit"); + runner.clearTransferState(); + + runner.setProperty(ScanAttribute.ATTRIBUTE_PATTERN, "c.*"); + runner.enqueue(new byte[0], attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_UNMATCHED, 1); + runner.clearTransferState(); + + } + + @Test + public void testAllMatchWithMeta() { + final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); + FlowFile f; + + runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary1_meta"); + runner.setProperty(ScanAttribute.MATCHING_CRITERIA, ScanAttribute.MATCH_CRITERIA_ALL); + runner.setProperty(ScanAttribute.ATTRIBUTE_PATTERN, "pro.*"); + runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); + + final Map attributes = new HashMap<>(); + attributes.put("produce_name", "banana"); + + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); + f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); + + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); + + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); + assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); + + runner.clearTransferState(); + + attributes.remove("produce_name"); + runner.enqueue(new byte[0], attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); + runner.clearTransferState(); + + attributes.put("produce_name", "banana"); + attributes.put("produce_name_2", "french fries"); + runner.enqueue(new byte[0], attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_UNMATCHED, 1); + runner.clearTransferState(); + + attributes.put("produce_name", "corn"); + attributes.put("produce_name_2", "banana"); + runner.enqueue(new byte[0], attributes); + runner.run(); + + + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); + f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); + + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); + assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); + + runner.assertAllFlowFilesContainAttribute("dictionary.hit.2.term"); + runner.assertAllFlowFilesContainAttribute("dictionary.hit.2.metadata"); + + assertEquals(f.getAttribute("dictionary.hit.2.term") ,"corn"); + assertEquals(f.getAttribute("dictionary.hit.2.metadata"), "yellow vegetable"); + } + + @Test + public void testWithEmptyEntriesWithMeta() { + final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); + runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary-with-empty-new-lines_meta"); + runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); + + final Map attributes = new HashMap<>(); + attributes.put("produce_name", ""); + + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_UNMATCHED, 1); + runner.clearTransferState(); + + runner.setProperty(ScanAttribute.ATTRIBUTE_PATTERN, "pro.*"); + runner.enqueue(new byte[0], attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_UNMATCHED, 1); + } + + @Test + public void testWithDictionaryFilterWithMeta() { + final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); + FlowFile f; + + runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary-with-extra-info_meta"); + runner.setProperty(ScanAttribute.DICTIONARY_FILTER, "(.*)"); + runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); + + final Map attributes = new HashMap<>(); + attributes.put("produce_name", "banana"); + + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); + f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); + + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); + + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); + assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); + runner.clearTransferState(); + + attributes.put("produce_name", "tomatoe"); + runner.enqueue(new byte[0], attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_UNMATCHED, 1); + runner.clearTransferState(); + + runner.setProperty(ScanAttribute.DICTIONARY_FILTER, "(.*)<.*>"); + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); + f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); + + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); + + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"tomatoe"); + assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "red vegetable"); + + + runner.clearTransferState(); + } } From a7a7b6ace80380416c342809ce95a4f4087bb066 Mon Sep 17 00:00:00 2001 From: Joe Trite Date: Wed, 22 Feb 2017 16:48:10 -0500 Subject: [PATCH 3/5] NIFI-3497 - New dictionary files for test Adding new dictionary files to support metadata dictionary option. --- .../ScanAttribute/dictionary-with-empty-new-lines_meta | 8 ++++++++ .../ScanAttribute/dictionary-with-extra-info_meta | 9 +++++++++ .../src/test/resources/ScanAttribute/dictionary1_meta | 5 +++++ 3 files changed, 22 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary-with-empty-new-lines_meta create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary-with-extra-info_meta create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary1_meta diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary-with-empty-new-lines_meta b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary-with-empty-new-lines_meta new file mode 100644 index 0000000000..4fbc6b0284 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary-with-empty-new-lines_meta @@ -0,0 +1,8 @@ +banana:yellow fruit + + +zucchini:green vegetable + + + + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary-with-extra-info_meta b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary-with-extra-info_meta new file mode 100644 index 0000000000..46c62878b5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary-with-extra-info_meta @@ -0,0 +1,9 @@ +banana:yellow fruit +zucchini:green vegetable +apple:red fruit +lime:green fruit +corn:yellow vegetable +celery:green vegetable +eggplant:purple vegetable +tomatoe:red vegetable +cherry:red fruit \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary1_meta b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary1_meta new file mode 100644 index 0000000000..bc8716786f --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ScanAttribute/dictionary1_meta @@ -0,0 +1,5 @@ +banana:yellow fruit +zucchini:green vegetable +lime:green fruit +corn:yellow vegetable +cherry:red fruit \ No newline at end of file From d71426037b142da8ca04dae38952c164d1614806 Mon Sep 17 00:00:00 2001 From: Joe Trite Date: Thu, 23 Feb 2017 10:19:01 -0500 Subject: [PATCH 4/5] NIFI-3497 - excluding test files Adding new test data files to exclude list. --- .../nifi-standard-bundle/nifi-standard-processors/pom.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 5fd9ae24da..bd06632168 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -291,6 +291,9 @@ language governing permissions and limitations under the License. --> src/test/resources/ScanAttribute/dictionary-with-empty-new-lines src/test/resources/ScanAttribute/dictionary-with-extra-info src/test/resources/ScanAttribute/dictionary1 + src/test/resources/ScanAttribute/dictionary-with-empty-new-lines_meta + src/test/resources/ScanAttribute/dictionary-with-extra-info_meta + src/test/resources/ScanAttribute/dictionary1_meta src/test/resources/TestEncryptContent/text.txt src/test/resources/TestEncryptContent/text.txt.asc src/test/resources/TestIdentifyMimeType/1.txt From 89ec68d14bb34cbe65ff9a4d50ff5321fd4ec0ef Mon Sep 17 00:00:00 2001 From: Joe Trite Date: Fri, 24 Feb 2017 18:09:36 -0500 Subject: [PATCH 5/5] NIFI-3497 - fixing Pcontrib issues --- .../processors/standard/ScanAttribute.java | 94 +++++++++---------- .../standard/TestScanAttribute.java | 52 +++++----- 2 files changed, 70 insertions(+), 76 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java index 4b7023983c..f1d06a6a26 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java @@ -107,7 +107,6 @@ public class ScanAttribute extends AbstractProcessor { .addValidator(StandardValidators.createRegexValidator(0, 1, false)) .defaultValue(null) .build(); - public static final PropertyDescriptor DICTIONARY_ENTRY_METADATA_DEMARCATOR = new PropertyDescriptor.Builder() .name("Dictionary Entry Metadata Demarcator") .description("A single character used to demarcate the dictionary entry string between dictionary value and metadata.") @@ -115,14 +114,14 @@ public class ScanAttribute extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue(null) .build(); - + private List properties; private Set relationships; private volatile Pattern dictionaryFilterPattern = null; private volatile Pattern attributePattern = null; private volatile String dictionaryEntryMetadataDemarcator = null; - private volatile Map dictionaryTerms = null; + private volatile Map dictionaryTerms = null; private volatile Set attributeNameMatches = null; private volatile SynchronousFileWatcher fileWatcher = null; @@ -144,7 +143,7 @@ public class ScanAttribute extends AbstractProcessor { properties.add(MATCHING_CRITERIA); properties.add(DICTIONARY_FILTER); properties.add(DICTIONARY_ENTRY_METADATA_DEMARCATOR); - + this.properties = Collections.unmodifiableList(properties); final Set relationships = new HashSet<>(); @@ -173,7 +172,7 @@ public class ScanAttribute extends AbstractProcessor { this.dictionaryTerms = createDictionary(context); this.fileWatcher = new SynchronousFileWatcher(Paths.get(context.getProperty(DICTIONARY_FILE).getValue()), new LastModifiedMonitor(), 1000L); - + this.dictionaryEntryMetadataDemarcator = context.getProperty(DICTIONARY_ENTRY_METADATA_DEMARCATOR).getValue(); } @@ -184,8 +183,8 @@ public class ScanAttribute extends AbstractProcessor { String[] termMeta; String term; String meta; - - + + final File file = new File(context.getProperty(DICTIONARY_FILE).getValue()); try (final InputStream fis = new FileInputStream(file); final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { @@ -195,19 +194,16 @@ public class ScanAttribute extends AbstractProcessor { if (line.trim().isEmpty()) { continue; } - - if(dictionaryEntryMetadataDemarcator != null && line.contains(dictionaryEntryMetadataDemarcator)) - { - termMeta = line.split(dictionaryEntryMetadataDemarcator); - term = termMeta[0]; - meta = termMeta[1]; + + if(dictionaryEntryMetadataDemarcator != null && line.contains(dictionaryEntryMetadataDemarcator)) { + termMeta = line.split(dictionaryEntryMetadataDemarcator); + term = termMeta[0]; + meta = termMeta[1]; + } else { + term=line; + meta=""; } - else - { - term=line; - meta=""; - } - + String matchingTerm = term; if (dictionaryFilterPattern != null) { final Matcher matcher = dictionaryFilterPattern.matcher(term); @@ -248,9 +244,9 @@ public class ScanAttribute extends AbstractProcessor { final boolean matchAll = context.getProperty(MATCHING_CRITERIA).getValue().equals(MATCH_CRITERIA_ALL); for (FlowFile flowFile : flowFiles) { - final Map matched = (matchAll ? matchAll(flowFile, attributePattern, dictionaryTerms) : matchAny(flowFile, attributePattern, dictionaryTerms)); - flowFile = session.putAllAttributes(flowFile, matched); - + final Map matched = (matchAll ? matchAll(flowFile, attributePattern, dictionaryTerms) : matchAny(flowFile, attributePattern, dictionaryTerms)); + flowFile = session.putAllAttributes(flowFile, matched); + final Relationship relationship = (((matched.size() == (attributeNameMatches.size() * 3) && matchAll) || (matched.size() > 0 && !matchAll))) ? REL_MATCHED : REL_UNMATCHED; session.getProvenanceReporter().route(flowFile, relationship); session.transfer(flowFile, relationship); @@ -258,48 +254,46 @@ public class ScanAttribute extends AbstractProcessor { } } - private Map matchAny(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { - Map dictionaryTermMatches = new HashMap(); - attributeNameMatches = new HashSet(); + private Map matchAny(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { + Map dictionaryTermMatches = new HashMap(); + attributeNameMatches = new HashSet(); + + int hitCounter = 0; - int hitCounter = 0; - for (final Map.Entry attribute : flowFile.getAttributes().entrySet()) { if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) { - attributeNameMatches.add(attribute.getKey()); - + attributeNameMatches.add(attribute.getKey()); + if (dictionary.containsKey(attribute.getValue())) { - hitCounter++; - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); + hitCounter++; + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); } } } return dictionaryTermMatches; } - - private Map matchAll(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { - Map dictionaryTermMatches = new HashMap(); - attributeNameMatches = new HashSet(); - int hitCounter = 0; - + private Map matchAll(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { + Map dictionaryTermMatches = new HashMap(); + attributeNameMatches = new HashSet(); + + int hitCounter = 0; + for (final Map.Entry attribute : flowFile.getAttributes().entrySet()) { if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) { - attributeNameMatches.add(attribute.getKey()); + attributeNameMatches.add(attribute.getKey()); if (dictionary.containsKey(attribute.getValue())) { - hitCounter++; - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); - } - else - { - //if one attribute value is not found in the dictionary then no need to continue since this is a matchAll scenario. - dictionaryTermMatches.clear(); - break; + hitCounter++; + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); + } else { + //if one attribute value is not found in the dictionary then no need to continue since this is a matchAll scenario. + dictionaryTermMatches.clear(); + break; } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java index 53aecf357c..767b2ec73b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.processors.standard; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.util.HashMap; import java.util.Map; @@ -148,12 +148,12 @@ public class TestScanAttribute { runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); runner.clearTransferState(); } - + @Test public void testSingleMatchWithMeta() { final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); FlowFile f; - + runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary1_meta"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); @@ -165,19 +165,19 @@ public class TestScanAttribute { runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); - + runner.clearTransferState(); attributes.remove("produce_name"); runner.enqueue(new byte[0], attributes); runner.run(); - + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_UNMATCHED, 1); runner.clearTransferState(); @@ -188,10 +188,10 @@ public class TestScanAttribute { runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"cherry"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "red fruit"); runner.clearTransferState(); @@ -208,27 +208,27 @@ public class TestScanAttribute { public void testAllMatchWithMeta() { final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); FlowFile f; - + runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary1_meta"); runner.setProperty(ScanAttribute.MATCHING_CRITERIA, ScanAttribute.MATCH_CRITERIA_ALL); runner.setProperty(ScanAttribute.ATTRIBUTE_PATTERN, "pro.*"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); - + final Map attributes = new HashMap<>(); attributes.put("produce_name", "banana"); runner.enqueue(new byte[0], attributes); runner.run(); - + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); - + runner.clearTransferState(); attributes.remove("produce_name"); @@ -249,18 +249,18 @@ public class TestScanAttribute { runner.enqueue(new byte[0], attributes); runner.run(); - + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.2.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.2.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.2.term") ,"corn"); assertEquals(f.getAttribute("dictionary.hit.2.metadata"), "yellow vegetable"); } @@ -270,7 +270,7 @@ public class TestScanAttribute { final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary-with-empty-new-lines_meta"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); - + final Map attributes = new HashMap<>(); attributes.put("produce_name", ""); @@ -294,7 +294,7 @@ public class TestScanAttribute { runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary-with-extra-info_meta"); runner.setProperty(ScanAttribute.DICTIONARY_FILTER, "(.*)"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); - + final Map attributes = new HashMap<>(); attributes.put("produce_name", "banana"); @@ -303,10 +303,10 @@ public class TestScanAttribute { runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); runner.clearTransferState(); @@ -323,13 +323,13 @@ public class TestScanAttribute { runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"tomatoe"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "red vegetable"); - + runner.clearTransferState(); }