NIFI-3497 - fixing Pcontrib issues

This commit is contained in:
Joe Trite 2017-02-24 18:09:36 -05:00
parent c5d52cf6f0
commit 65ed46de9a
2 changed files with 70 additions and 76 deletions

View File

@ -107,7 +107,6 @@ public class ScanAttribute extends AbstractProcessor {
.addValidator(StandardValidators.createRegexValidator(0, 1, false)) .addValidator(StandardValidators.createRegexValidator(0, 1, false))
.defaultValue(null) .defaultValue(null)
.build(); .build();
public static final PropertyDescriptor DICTIONARY_ENTRY_METADATA_DEMARCATOR = new PropertyDescriptor.Builder() public static final PropertyDescriptor DICTIONARY_ENTRY_METADATA_DEMARCATOR = new PropertyDescriptor.Builder()
.name("Dictionary Entry Metadata Demarcator") .name("Dictionary Entry Metadata Demarcator")
.description("A single character used to demarcate the dictionary entry string between dictionary value and metadata.") .description("A single character used to demarcate the dictionary entry string between dictionary value and metadata.")
@ -115,14 +114,14 @@ public class ScanAttribute extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue(null) .defaultValue(null)
.build(); .build();
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private Set<Relationship> relationships; private Set<Relationship> relationships;
private volatile Pattern dictionaryFilterPattern = null; private volatile Pattern dictionaryFilterPattern = null;
private volatile Pattern attributePattern = null; private volatile Pattern attributePattern = null;
private volatile String dictionaryEntryMetadataDemarcator = null; private volatile String dictionaryEntryMetadataDemarcator = null;
private volatile Map<String,String> dictionaryTerms = null; private volatile Map<String,String> dictionaryTerms = null;
private volatile Set<String> attributeNameMatches = null; private volatile Set<String> attributeNameMatches = null;
private volatile SynchronousFileWatcher fileWatcher = null; private volatile SynchronousFileWatcher fileWatcher = null;
@ -144,7 +143,7 @@ public class ScanAttribute extends AbstractProcessor {
properties.add(MATCHING_CRITERIA); properties.add(MATCHING_CRITERIA);
properties.add(DICTIONARY_FILTER); properties.add(DICTIONARY_FILTER);
properties.add(DICTIONARY_ENTRY_METADATA_DEMARCATOR); properties.add(DICTIONARY_ENTRY_METADATA_DEMARCATOR);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>(); final Set<Relationship> relationships = new HashSet<>();
@ -173,7 +172,7 @@ public class ScanAttribute extends AbstractProcessor {
this.dictionaryTerms = createDictionary(context); this.dictionaryTerms = createDictionary(context);
this.fileWatcher = new SynchronousFileWatcher(Paths.get(context.getProperty(DICTIONARY_FILE).getValue()), new LastModifiedMonitor(), 1000L); this.fileWatcher = new SynchronousFileWatcher(Paths.get(context.getProperty(DICTIONARY_FILE).getValue()), new LastModifiedMonitor(), 1000L);
this.dictionaryEntryMetadataDemarcator = context.getProperty(DICTIONARY_ENTRY_METADATA_DEMARCATOR).getValue(); this.dictionaryEntryMetadataDemarcator = context.getProperty(DICTIONARY_ENTRY_METADATA_DEMARCATOR).getValue();
} }
@ -184,8 +183,8 @@ public class ScanAttribute extends AbstractProcessor {
String[] termMeta; String[] termMeta;
String term; String term;
String meta; String meta;
final File file = new File(context.getProperty(DICTIONARY_FILE).getValue()); final File file = new File(context.getProperty(DICTIONARY_FILE).getValue());
try (final InputStream fis = new FileInputStream(file); try (final InputStream fis = new FileInputStream(file);
final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
@ -195,19 +194,16 @@ public class ScanAttribute extends AbstractProcessor {
if (line.trim().isEmpty()) { if (line.trim().isEmpty()) {
continue; continue;
} }
if(dictionaryEntryMetadataDemarcator != null && line.contains(dictionaryEntryMetadataDemarcator)) if(dictionaryEntryMetadataDemarcator != null && line.contains(dictionaryEntryMetadataDemarcator)) {
{ termMeta = line.split(dictionaryEntryMetadataDemarcator);
termMeta = line.split(dictionaryEntryMetadataDemarcator); term = termMeta[0];
term = termMeta[0]; meta = termMeta[1];
meta = termMeta[1]; } else {
term=line;
meta="";
} }
else
{
term=line;
meta="";
}
String matchingTerm = term; String matchingTerm = term;
if (dictionaryFilterPattern != null) { if (dictionaryFilterPattern != null) {
final Matcher matcher = dictionaryFilterPattern.matcher(term); final Matcher matcher = dictionaryFilterPattern.matcher(term);
@ -248,9 +244,9 @@ public class ScanAttribute extends AbstractProcessor {
final boolean matchAll = context.getProperty(MATCHING_CRITERIA).getValue().equals(MATCH_CRITERIA_ALL); final boolean matchAll = context.getProperty(MATCHING_CRITERIA).getValue().equals(MATCH_CRITERIA_ALL);
for (FlowFile flowFile : flowFiles) { for (FlowFile flowFile : flowFiles) {
final Map<String,String> matched = (matchAll ? matchAll(flowFile, attributePattern, dictionaryTerms) : matchAny(flowFile, attributePattern, dictionaryTerms)); final Map<String,String> matched = (matchAll ? matchAll(flowFile, attributePattern, dictionaryTerms) : matchAny(flowFile, attributePattern, dictionaryTerms));
flowFile = session.putAllAttributes(flowFile, matched); flowFile = session.putAllAttributes(flowFile, matched);
final Relationship relationship = (((matched.size() == (attributeNameMatches.size() * 3) && matchAll) || (matched.size() > 0 && !matchAll))) ? REL_MATCHED : REL_UNMATCHED; final Relationship relationship = (((matched.size() == (attributeNameMatches.size() * 3) && matchAll) || (matched.size() > 0 && !matchAll))) ? REL_MATCHED : REL_UNMATCHED;
session.getProvenanceReporter().route(flowFile, relationship); session.getProvenanceReporter().route(flowFile, relationship);
session.transfer(flowFile, relationship); session.transfer(flowFile, relationship);
@ -258,48 +254,46 @@ public class ScanAttribute extends AbstractProcessor {
} }
} }
private Map<String,String> matchAny(final FlowFile flowFile, final Pattern attributePattern, final Map<String,String> dictionary) { private Map<String,String> matchAny(final FlowFile flowFile, final Pattern attributePattern, final Map<String,String> dictionary) {
Map<String,String> dictionaryTermMatches = new HashMap<String,String>(); Map<String,String> dictionaryTermMatches = new HashMap<String,String>();
attributeNameMatches = new HashSet<String>(); attributeNameMatches = new HashSet<String>();
int hitCounter = 0;
int hitCounter = 0;
for (final Map.Entry<String, String> attribute : flowFile.getAttributes().entrySet()) { for (final Map.Entry<String, String> attribute : flowFile.getAttributes().entrySet()) {
if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) { if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) {
attributeNameMatches.add(attribute.getKey()); attributeNameMatches.add(attribute.getKey());
if (dictionary.containsKey(attribute.getValue())) { if (dictionary.containsKey(attribute.getValue())) {
hitCounter++; hitCounter++;
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey());
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue());
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue()));
} }
} }
} }
return dictionaryTermMatches; return dictionaryTermMatches;
} }
private Map<String,String> matchAll(final FlowFile flowFile, final Pattern attributePattern, final Map<String,String> dictionary) {
Map<String,String> dictionaryTermMatches = new HashMap<String,String>();
attributeNameMatches = new HashSet<String>();
int hitCounter = 0; private Map<String,String> matchAll(final FlowFile flowFile, final Pattern attributePattern, final Map<String,String> dictionary) {
Map<String,String> dictionaryTermMatches = new HashMap<String,String>();
attributeNameMatches = new HashSet<String>();
int hitCounter = 0;
for (final Map.Entry<String, String> attribute : flowFile.getAttributes().entrySet()) { for (final Map.Entry<String, String> attribute : flowFile.getAttributes().entrySet()) {
if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) { if (attributePattern == null || attributePattern.matcher(attribute.getKey()).matches()) {
attributeNameMatches.add(attribute.getKey()); attributeNameMatches.add(attribute.getKey());
if (dictionary.containsKey(attribute.getValue())) { if (dictionary.containsKey(attribute.getValue())) {
hitCounter++; hitCounter++;
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey()); dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".attribute", attribute.getKey());
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue()); dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".term", attribute.getValue());
dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue())); dictionaryTermMatches.put("dictionary.hit." + hitCounter + ".metadata", dictionary.get(attribute.getValue()));
} } else {
else //if one attribute value is not found in the dictionary then no need to continue since this is a matchAll scenario.
{ dictionaryTermMatches.clear();
//if one attribute value is not found in the dictionary then no need to continue since this is a matchAll scenario. break;
dictionaryTermMatches.clear();
break;
} }
} }
} }

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -148,12 +148,12 @@ public class TestScanAttribute {
runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1);
runner.clearTransferState(); runner.clearTransferState();
} }
@Test @Test
public void testSingleMatchWithMeta() { public void testSingleMatchWithMeta() {
final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute());
FlowFile f; FlowFile f;
runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary1_meta"); runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary1_meta");
runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":");
@ -165,19 +165,19 @@ public class TestScanAttribute {
runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1);
f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0);
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term");
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata");
assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana");
assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit");
runner.clearTransferState(); runner.clearTransferState();
attributes.remove("produce_name"); attributes.remove("produce_name");
runner.enqueue(new byte[0], attributes); runner.enqueue(new byte[0], attributes);
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ScanAttribute.REL_UNMATCHED, 1); runner.assertAllFlowFilesTransferred(ScanAttribute.REL_UNMATCHED, 1);
runner.clearTransferState(); runner.clearTransferState();
@ -188,10 +188,10 @@ public class TestScanAttribute {
runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1);
f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0);
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term");
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata");
assertEquals(f.getAttribute("dictionary.hit.1.term") ,"cherry"); assertEquals(f.getAttribute("dictionary.hit.1.term") ,"cherry");
assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "red fruit"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "red fruit");
runner.clearTransferState(); runner.clearTransferState();
@ -208,27 +208,27 @@ public class TestScanAttribute {
public void testAllMatchWithMeta() { public void testAllMatchWithMeta() {
final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute());
FlowFile f; FlowFile f;
runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary1_meta"); runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary1_meta");
runner.setProperty(ScanAttribute.MATCHING_CRITERIA, ScanAttribute.MATCH_CRITERIA_ALL); runner.setProperty(ScanAttribute.MATCHING_CRITERIA, ScanAttribute.MATCH_CRITERIA_ALL);
runner.setProperty(ScanAttribute.ATTRIBUTE_PATTERN, "pro.*"); runner.setProperty(ScanAttribute.ATTRIBUTE_PATTERN, "pro.*");
runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":");
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("produce_name", "banana"); attributes.put("produce_name", "banana");
runner.enqueue(new byte[0], attributes); runner.enqueue(new byte[0], attributes);
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1);
f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0);
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term");
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata");
assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana");
assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit");
runner.clearTransferState(); runner.clearTransferState();
attributes.remove("produce_name"); attributes.remove("produce_name");
@ -249,18 +249,18 @@ public class TestScanAttribute {
runner.enqueue(new byte[0], attributes); runner.enqueue(new byte[0], attributes);
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1);
f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0);
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term");
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata");
assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana");
assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit");
runner.assertAllFlowFilesContainAttribute("dictionary.hit.2.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.2.term");
runner.assertAllFlowFilesContainAttribute("dictionary.hit.2.metadata"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.2.metadata");
assertEquals(f.getAttribute("dictionary.hit.2.term") ,"corn"); assertEquals(f.getAttribute("dictionary.hit.2.term") ,"corn");
assertEquals(f.getAttribute("dictionary.hit.2.metadata"), "yellow vegetable"); assertEquals(f.getAttribute("dictionary.hit.2.metadata"), "yellow vegetable");
} }
@ -270,7 +270,7 @@ public class TestScanAttribute {
final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute()); final TestRunner runner = TestRunners.newTestRunner(new ScanAttribute());
runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary-with-empty-new-lines_meta"); runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary-with-empty-new-lines_meta");
runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":");
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("produce_name", ""); attributes.put("produce_name", "");
@ -294,7 +294,7 @@ public class TestScanAttribute {
runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary-with-extra-info_meta"); runner.setProperty(ScanAttribute.DICTIONARY_FILE, "src/test/resources/ScanAttribute/dictionary-with-extra-info_meta");
runner.setProperty(ScanAttribute.DICTIONARY_FILTER, "(.*)<fruit>"); runner.setProperty(ScanAttribute.DICTIONARY_FILTER, "(.*)<fruit>");
runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":"); runner.setProperty(ScanAttribute.DICTIONARY_ENTRY_METADATA_DEMARCATOR, ":");
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("produce_name", "banana"); attributes.put("produce_name", "banana");
@ -303,10 +303,10 @@ public class TestScanAttribute {
runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1);
f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0);
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term");
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata");
assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana"); assertEquals(f.getAttribute("dictionary.hit.1.term") ,"banana");
assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "yellow fruit");
runner.clearTransferState(); runner.clearTransferState();
@ -323,13 +323,13 @@ public class TestScanAttribute {
runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1); runner.assertAllFlowFilesTransferred(ScanAttribute.REL_MATCHED, 1);
f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0); f = runner.getFlowFilesForRelationship(ScanAttribute.REL_MATCHED).get(0);
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.term");
runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata"); runner.assertAllFlowFilesContainAttribute("dictionary.hit.1.metadata");
assertEquals(f.getAttribute("dictionary.hit.1.term") ,"tomatoe"); assertEquals(f.getAttribute("dictionary.hit.1.term") ,"tomatoe");
assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "red vegetable"); assertEquals(f.getAttribute("dictionary.hit.1.metadata"), "red vegetable");
runner.clearTransferState(); runner.clearTransferState();
} }