diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java index 4b7023983c..f1d06a6a26 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java @@ -107,7 +107,6 @@ public class ScanAttribute extends AbstractProcessor { .addValidator(StandardValidators.createRegexValidator(0, 1, false)) .defaultValue(null) .build(); - public static final PropertyDescriptor DICTIONARY_ENTRY_METADATA_DEMARCATOR = new PropertyDescriptor.Builder() .name("Dictionary Entry Metadata Demarcator") .description("A single character used to demarcate the dictionary entry string between dictionary value and metadata.") @@ -115,14 +114,14 @@ public class ScanAttribute extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue(null) .build(); - + private List properties; private Set relationships; private volatile Pattern dictionaryFilterPattern = null; private volatile Pattern attributePattern = null; private volatile String dictionaryEntryMetadataDemarcator = null; - private volatile Map dictionaryTerms = null; + private volatile Map dictionaryTerms = null; private volatile Set attributeNameMatches = null; private volatile SynchronousFileWatcher fileWatcher = null; @@ -144,7 +143,7 @@ public class ScanAttribute extends AbstractProcessor { properties.add(MATCHING_CRITERIA); properties.add(DICTIONARY_FILTER); properties.add(DICTIONARY_ENTRY_METADATA_DEMARCATOR); - + this.properties = Collections.unmodifiableList(properties); final Set relationships = new HashSet<>(); @@ -173,7 +172,7 @@ public class ScanAttribute extends AbstractProcessor { this.dictionaryTerms = createDictionary(context); this.fileWatcher = new SynchronousFileWatcher(Paths.get(context.getProperty(DICTIONARY_FILE).getValue()), new LastModifiedMonitor(), 1000L); - + this.dictionaryEntryMetadataDemarcator = context.getProperty(DICTIONARY_ENTRY_METADATA_DEMARCATOR).getValue(); } @@ -184,8 +183,8 @@ public class ScanAttribute extends AbstractProcessor { String[] termMeta; String term; String meta; - - + + final File file = new File(context.getProperty(DICTIONARY_FILE).getValue()); try (final InputStream fis = new FileInputStream(file); final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { @@ -195,19 +194,16 @@ public class ScanAttribute extends AbstractProcessor { if (line.trim().isEmpty()) { continue; } - - if(dictionaryEntryMetadataDemarcator != null && line.contains(dictionaryEntryMetadataDemarcator)) - { - termMeta = line.split(dictionaryEntryMetadataDemarcator); - term = termMeta[0]; - meta = termMeta[1]; + + if(dictionaryEntryMetadataDemarcator != null && line.contains(dictionaryEntryMetadataDemarcator)) { + termMeta = line.split(dictionaryEntryMetadataDemarcator); + term = termMeta[0]; + meta = termMeta[1]; + } else { + term=line; + meta=""; } - else - { - term=line; - meta=""; - } - + String matchingTerm = term; if (dictionaryFilterPattern != null) { final Matcher matcher = dictionaryFilterPattern.matcher(term); @@ -248,9 +244,9 @@ public class ScanAttribute extends AbstractProcessor { final boolean matchAll = context.getProperty(MATCHING_CRITERIA).getValue().equals(MATCH_CRITERIA_ALL); for (FlowFile flowFile : flowFiles) { - final Map matched = (matchAll ? matchAll(flowFile, attributePattern, dictionaryTerms) : matchAny(flowFile, attributePattern, dictionaryTerms)); - flowFile = session.putAllAttributes(flowFile, matched); - + final Map matched = (matchAll ? matchAll(flowFile, attributePattern, dictionaryTerms) : matchAny(flowFile, attributePattern, dictionaryTerms)); + flowFile = session.putAllAttributes(flowFile, matched); + final Relationship relationship = (((matched.size() == (attributeNameMatches.size() * 3) && matchAll) || (matched.size() > 0 && !matchAll))) ? REL_MATCHED : REL_UNMATCHED; session.getProvenanceReporter().route(flowFile, relationship); session.transfer(flowFile, relationship); @@ -258,48 +254,46 @@ public class ScanAttribute extends AbstractProcessor { } } - private Map matchAny(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { - Map dictionaryTermMatches = new HashMap(); - attributeNameMatches = new HashSet(); + private Map matchAny(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { + Map dictionaryTermMatches = new HashMap(); + attributeNameMatches = new HashSet(); + + int hitCounter = 0; - int hitCounter = 0; - for (final Map.Entry attribute : flowFile.getAttributes().entrySet()) { if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) { - attributeNameMatches.add(attribute.getKey()); - + attributeNameMatches.add(attribute.getKey()); + if (dictionary.containsKey(attribute.getValue())) { - hitCounter++; - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); + hitCounter++; + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); } } } return dictionaryTermMatches; } - - private Map matchAll(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { - Map dictionaryTermMatches = new HashMap(); - attributeNameMatches = new HashSet(); - int hitCounter = 0; - + private Map matchAll(final FlowFile flowFile, final Pattern attributePattern, final Map dictionary) { + Map dictionaryTermMatches = new HashMap(); + attributeNameMatches = new HashSet(); + + int hitCounter = 0; + for (final Map.Entry attribute : flowFile.getAttributes().entrySet()) { if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) { - attributeNameMatches.add(attribute.getKey()); + attributeNameMatches.add(attribute.getKey()); if (dictionary.containsKey(attribute.getValue())) { - hitCounter++; - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); - dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); - } - else - { - //if one attribute value is not found in the dictionary then no need to continue since this is a matchAll scenario. - dictionaryTermMatches.clear(); - break; + hitCounter++; + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); + dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); + } else { + //if one attribute value is not found in the dictionary then no need to continue since this is a matchAll scenario. + dictionaryTermMatches.clear(); + break; } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java index 53aecf357c..767b2ec73b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanAttribute.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.processors.standard; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.util.HashMap; import java.util.Map; @@ -148,12 +148,12 @@ public class TestScanAttribute { runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); runner.clearTransferState(); } - + @Test public void testSingleMatchWithMeta() { final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); FlowFile f; - + runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary1_meta"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); @@ -165,19 +165,19 @@ public class TestScanAttribute { runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); - + runner.clearTransferState(); attributes.remove("produce_name"); runner.enqueue(new byte[0], attributes); runner.run(); - + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_UNMATCHED, 1); runner.clearTransferState(); @@ -188,10 +188,10 @@ public class TestScanAttribute { runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"cherry"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "red fruit"); runner.clearTransferState(); @@ -208,27 +208,27 @@ public class TestScanAttribute { public void testAllMatchWithMeta() { final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); FlowFile f; - + runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary1_meta"); runner.setProperty(ScanAttribute.MATCHING_CRITERIA, ScanAttribute.MATCH_CRITERIA_ALL); runner.setProperty(ScanAttribute.ATTRIBUTE_PATTERN, "pro.*"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); - + final Map attributes = new HashMap<>(); attributes.put("produce_name", "banana"); runner.enqueue(new byte[0], attributes); runner.run(); - + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); - + runner.clearTransferState(); attributes.remove("produce_name"); @@ -249,18 +249,18 @@ public class TestScanAttribute { runner.enqueue(new byte[0], attributes); runner.run(); - + runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.2.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.2.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.2.term") ,"corn"); assertEquals(f.getAttribute("dictionary.hit.2.metadata"), "yellow vegetable"); } @@ -270,7 +270,7 @@ public class TestScanAttribute { final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary-with-empty-new-lines_meta"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); - + final Map attributes = new HashMap<>(); attributes.put("produce_name", ""); @@ -294,7 +294,7 @@ public class TestScanAttribute { runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary-with-extra-info_meta"); runner.setProperty(ScanAttribute.DICTIONARY_FILTER, "(.*)"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); - + final Map attributes = new HashMap<>(); attributes.put("produce_name", "banana"); @@ -303,10 +303,10 @@ public class TestScanAttribute { runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); runner.clearTransferState(); @@ -323,13 +323,13 @@ public class TestScanAttribute { runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); - + runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); - + assertEquals(f.getAttribute("dictionary.hit.1.term") ,"tomatoe"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "red vegetable"); - + runner.clearTransferState(); }