diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/src/main/resources/META-INF/NOTICE
index 13c7902923..30f4cfb43f 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/src/main/resources/META-INF/NOTICE
@@ -51,6 +51,16 @@ The following binary components are provided under the Apache Software License v
Apache Commons Logging
Copyright 2003-2013 The Apache Software Foundation
+ (ASLv2) Apache Commons Net
+ The following NOTICE information applies:
+ Apache Commons Net
+ Copyright 2001-2016 The Apache Software Foundation
+
+ (ASLv2) Guava
+ The following NOTICE information applies:
+ Guava
+ Copyright 2015 The Guava Authors
+
(ASLv2) GeoIP2 Java API
The following NOTICE information applies:
GeoIP2 Java API
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
index 3def405d51..2268a484db 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
@@ -63,5 +63,31 @@
nifi-mock
test
+
+ commons-net
+ commons-net
+
+
+ com.google.guava
+ guava
+
+
+ org.powermock
+ powermock-module-junit4
+ 1.6.5
+ test
+
+
+ org.powermock
+ powermock-api-mockito
+ 1.6.5
+ test
+
+
+ org.powermock
+ powermock-api-mockito-common
+ 1.6.5
+ test
+
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
index c576ca24d3..0ba7934bba 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
@@ -18,6 +18,10 @@
package org.apache.nifi.processors.enrich;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@@ -30,6 +34,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -62,10 +67,21 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
public static final PropertyDescriptor QUERY_PARSER_INPUT = new PropertyDescriptor.Builder()
.name("QUERY_PARSER_INPUT")
.displayName("Parser RegEx")
- .description("Choice between a splitter and regex matcher used to parse the results of the query into attribute groups")
+ .description("Choice between a splitter and regex matcher used to parse the results of the query into attribute groups.\n" +
+ "NOTE: This is a multiline regular expression, therefore, the DFM should decide how to handle trailing new line " +
+ "characters.")
.expressionLanguageSupported(false)
.required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor KEY_GROUP = new PropertyDescriptor.Builder()
+ .name("KEY_GROUP")
+ .displayName("Key lookup group (multiline / batch)")
+ .description("When performing a batched lookup, the following RegEx numbered capture group or Column number will be used to match " +
+ "the whois server response with the lookup field")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
@@ -85,18 +101,20 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
final List results = new ArrayList<>(super.customValidate(validationContext));
final String chosenQUERY_PARSER = validationContext.getProperty(QUERY_PARSER).getValue();
- final boolean QUERY_PARSER_INPUT_isSet = validationContext.getProperty(QUERY_PARSER_INPUT).isSet();
- if ((!chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( !QUERY_PARSER_INPUT_isSet )) {
+ if (!chosenQUERY_PARSER.equals(NONE.getValue()) && !validationContext.getProperty(QUERY_PARSER_INPUT).isSet() ) {
results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT")
+ .subject(QUERY_PARSER.getDisplayName())
.explanation("Split and Regex parsers require a valid Regular Expression")
.valid(false)
.build());
}
- if ((chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( QUERY_PARSER_INPUT_isSet )) {
- results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT")
- .explanation("NONE parser does not support the use of Regular Expressions")
+ if (chosenQUERY_PARSER.equals(NONE.getValue()) && validationContext.getProperty(QUERY_PARSER_INPUT).isSet()) {
+ results.add(new ValidationResult.Builder().input("QUERY_PARSER")
+ .subject(QUERY_PARSER_INPUT.getDisplayName())
+ .explanation("NONE parser does not support the use of Regular Expressions. " +
+ "Please select another parser or delete the regular expression entered in this field.")
.valid(false)
.build());
}
@@ -117,11 +135,11 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
* @param queryRegex The regex to be used to split the query results into groups
* @return Map with attribute names and values
*/
- protected Map parseResponse(int recordPosition, String rawResult, String queryParser, String queryRegex, String schema) {
+ protected Map parseResponse(String recordPosition, String rawResult, String queryParser, String queryRegex, String schema) {
Map results = new HashMap<>();
Pattern p;
-
+ recordPosition = StringUtils.isEmpty(recordPosition) ? "0" : recordPosition;
// Iterates over the results using the QUERY_REGEX adding the captured groups
// as it progresses
@@ -131,7 +149,7 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
// Time to Split the results...
String[] splitResult = rawResult.split(queryRegex);
for (int r = 0; r < splitResult.length; r++) {
- results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group" + String.valueOf(r), splitResult[r]);
+ results.put("enrich." + schema + ".record" + recordPosition + ".group" + String.valueOf(r), splitResult[r]);
}
break;
@@ -143,7 +161,7 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
// Note that RegEx matches capture group 0 is usually broad but starting with it anyway
// for the sake of purity
for (int r = 0; r < finalResult.groupCount(); r++) {
- results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group" + String.valueOf(r), finalResult.group(r));
+ results.put("enrich." + schema + ".record" + recordPosition + ".group" + String.valueOf(r), finalResult.group(r));
}
}
break;
@@ -152,10 +170,68 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
// Fails to NONE
default:
// NONE was chosen, just appending the record result as group0 without further splitting
- results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group0", rawResult);
+ results.put("enrich." + schema + ".record" + recordPosition + ".group0", rawResult);
break;
}
return results;
}
+ /**
+ * This method returns the parsed record string in the form of
+ * a map of two strings, consisting of a iteration aware attribute
+ * names and its values
+ *
+
+ * @param rawResult the raw query results to be parsed
+ * @param queryParser The parsing mechanism being used to parse the data into groups
+ * @param queryRegex The regex to be used to split the query results into groups. The regex MUST implement at least on named capture group "KEY" to be used to populate the table rows
+ * @param lookupKey The regular expression number or the column of a split to be used for matching
+ * @return Table with attribute names and values where each Table row uses the value of the KEY named capture group specified in @param queryRegex
+ */
+ protected Table parseBatchResponse(String rawResult, String queryParser, String queryRegex, int lookupKey, String schema) {
+ // Note the hardcoded record0.
+ // Since iteration is done within the parser and Multimap is used, the record number here will always be 0.
+ // Consequentially, 0 is hardcoded so that batched and non batched attributes follow the same naming
+ // conventions
+ final String recordPosition = ".record0";
+
+ final Table results = HashBasedTable.create();
+
+ switch (queryParser) {
+ case "Split":
+ Scanner scanner = new Scanner(rawResult);
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ // Time to Split the results...
+ String[] splitResult = line.split(queryRegex);
+
+ for (int r = 0; r < splitResult.length; r++) {
+ results.put(splitResult[ lookupKey - 1 ], "enrich." + schema + recordPosition + ".group" + String.valueOf(r), splitResult[r]);
+ }
+ }
+ break;
+ case "RegEx":
+ // prepare the regex
+ Pattern p;
+ // Regex is multiline. Each line should include a KEY for lookup
+ p = Pattern.compile(queryRegex, Pattern.MULTILINE);
+
+ Matcher matcher = p.matcher(rawResult);
+ while (matcher.find()) {
+ try {
+ // Note that RegEx matches capture group 0 is usually broad but starting with it anyway
+ // for the sake of purity
+ for (int r = 0; r <= matcher.groupCount(); r++) {
+ results.put(matcher.group(lookupKey), "enrich." + schema + recordPosition + ".group" + String.valueOf(r), matcher.group(r));
+ }
+ } catch (IndexOutOfBoundsException e) {
+ getLogger().warn("Could not find capture group {} while processing result. You may want to review your " +
+ "Regular Expression to match against the content \"{}\"", new Object[]{lookupKey, rawResult});
+ }
+ }
+ break;
+ }
+
+ return results;
+ }
}
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
index bb346c6e0e..362bb2e846 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
@@ -176,7 +176,8 @@ public class QueryDNS extends AbstractEnrichProcessor {
// While NXDOMAIN is being generated by doLookup catch
if (dnsRecord != "NXDOMAIN") {
- Map parsedResults = parseResponse(recordNumber, dnsRecord, queryParser, queryRegex, "dns");
+ // Map parsedResults = parseResponse(recordNumber, dnsRecord, queryParser, queryRegex, "dns");
+ Map parsedResults = parseResponse(String.valueOf(recordNumber), dnsRecord, queryParser, queryRegex, "dns");
flowFile = session.putAllAttributes(flowFile, parsedResults);
found = true;
} else {
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryWhois.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryWhois.java
new file mode 100644
index 0000000000..faf305bf33
--- /dev/null
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryWhois.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.enrich;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.net.whois.WhoisClient;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"whois", "enrich", "ip"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("A powerful whois query processor primary designed to enrich DataFlows with whois based APIs " +
+ "(e.g. ShadowServer's ASN lookup) but that can be also used to perform regular whois lookups.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "enrich.dns.record*.group*", description = "The captured fields of the Whois query response for each of the records received"),
+})
+public class QueryWhois extends AbstractEnrichProcessor {
+
+ public static final AllowableValue BEGIN_END = new AllowableValue("Begin/End", "Begin/End",
+ "The evaluated input of each flowfile is enclosed within begin and end tags. Each row contains a delimited set of fields");
+
+ public static final AllowableValue BULK_NONE = new AllowableValue("None", "None",
+ "Queries are made without any particular dialect");
+
+
+ public static final PropertyDescriptor WHOIS_QUERY_TYPE = new PropertyDescriptor.Builder()
+ .name("WHOIS_QUERY_TYPE")
+ .displayName("Whois Query Type")
+ .description("The Whois query type to be used by the processor (if used)")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor WHOIS_SERVER = new PropertyDescriptor.Builder()
+ .name("WHOIS_SERVER")
+ .displayName("Whois Server")
+ .description("The Whois server to be used")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor WHOIS_SERVER_PORT = new PropertyDescriptor.Builder()
+ .name("WHOIS_SERVER_PORT")
+ .displayName("Whois Server Port")
+ .description("The TCP port of the remote Whois server")
+ .required(true)
+ .defaultValue("43")
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor WHOIS_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("WHOIS_TIMEOUT")
+ .displayName("Whois Query Timeout")
+ .description("The amount of time to wait until considering a query as failed")
+ .required(true)
+ .defaultValue("1500 ms")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("BATCH_SIZE")
+ .displayName("Batch Size")
+ .description("The number of incoming FlowFiles to process in a single execution of this processor. ")
+ .required(true)
+ .defaultValue("25")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BULK_PROTOCOL = new PropertyDescriptor.Builder()
+ .name("BULK_PROTOCOL")
+ .displayName("Bulk Protocol")
+ .description("The protocol used to perform the bulk query. ")
+ .required(true)
+ .defaultValue(BULK_NONE.getValue())
+ .allowableValues(BEGIN_END, BULK_NONE)
+ .build();
+
+ @Override
+ public List customValidate(ValidationContext validationContext) {
+ final List results = new ArrayList<>(super.customValidate(validationContext));
+
+ final String chosenQUERY_PARSER = validationContext.getProperty(QUERY_PARSER).getValue();
+
+ if (!chosenQUERY_PARSER.equals(NONE.getValue()) && !validationContext.getProperty(QUERY_PARSER_INPUT).isSet() ) {
+ results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT")
+ .subject(QUERY_PARSER_INPUT.getDisplayName())
+ .explanation("Split and Regex parsers require a valid Regular Expression")
+ .valid(false)
+ .build());
+ }
+
+
+ if (validationContext.getProperty(BATCH_SIZE).asInteger() > 1 && !validationContext.getProperty(KEY_GROUP).isSet() ) {
+ results.add(new ValidationResult.Builder().input("KEY_GROUP")
+ .subject(KEY_GROUP.getDisplayName())
+ .explanation("when operating in Batching mode, RegEx and Split parsers require a " +
+ "valid capture group/matching column. Configure the processor batch size to 1" +
+ " or enter a valid column / named capture value.")
+ .valid(false)
+ .build());
+ }
+
+ if ( validationContext.getProperty(BATCH_SIZE).asInteger() > 1 && chosenQUERY_PARSER.equals(NONE.getValue()) ) {
+ results.add(new ValidationResult.Builder().input(validationContext.getProperty(BATCH_SIZE).getValue())
+ .subject(QUERY_PARSER.getDisplayName())
+ .explanation("NONE parser does not support batching. Configure Batch Size to 1 or use another parser.")
+ .valid(false)
+ .build());
+ }
+
+ if ( validationContext.getProperty(BATCH_SIZE).asInteger() == 1 && !validationContext.getProperty(BULK_PROTOCOL).getValue().equals(BULK_NONE.getValue()) ) {
+ results.add(new ValidationResult.Builder().input("BULK_PROTOCOL")
+ .subject(BATCH_SIZE.getDisplayName())
+ .explanation("Bulk protocol requirement requires batching. Configure Batch Size to more than 1 or " +
+ "use another protocol.")
+ .valid(false)
+ .build());
+ }
+
+
+
+ return results;
+ }
+
+ private final static List propertyDescriptors;
+ private final static Set relationships;
+
+ private WhoisClient whoisClient;
+
+ static {
+ List props = new ArrayList<>();
+ props.add(QUERY_INPUT);
+ props.add(WHOIS_QUERY_TYPE);
+ props.add(WHOIS_SERVER);
+ props.add(WHOIS_SERVER_PORT);
+ props.add(WHOIS_TIMEOUT);
+ props.add(BATCH_SIZE);
+ props.add(BULK_PROTOCOL);
+ props.add(QUERY_PARSER);
+ props.add(QUERY_PARSER_INPUT);
+ props.add(KEY_GROUP);
+ propertyDescriptors = Collections.unmodifiableList(props);
+
+ Set rels = new HashSet<>();
+ rels.add(REL_FOUND);
+ rels.add(REL_NOT_FOUND);
+ relationships = Collections.unmodifiableSet(rels);
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+
+ @Override
+ public Set getRelationships() {
+ return relationships;
+ }
+
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+ List flowFiles = session.get(batchSize);
+
+ if (flowFiles == null || flowFiles.isEmpty()) {
+ context.yield();
+ return;
+ }
+
+ // Build query
+ String buildString = "";
+ final String queryType = context.getProperty(WHOIS_QUERY_TYPE).getValue();
+
+ // Verify the the protocol mode and craft the "begin" pseudo-command, otherwise just the query type
+ buildString = context.getProperty(BULK_PROTOCOL).getValue().equals(BEGIN_END.getValue()) ? buildString.concat("begin ") : buildString.concat("");
+
+ // Append the query type
+ buildString = context.getProperty(WHOIS_QUERY_TYPE).isSet() ? buildString.concat(queryType + " " ) : buildString.concat("");
+
+ // A new line is required when working on Begin/End
+ buildString = context.getProperty(BULK_PROTOCOL).getValue().equals(BEGIN_END.getValue()) ? buildString.concat("\n") : buildString.concat("");
+
+ // append the values
+ for (FlowFile flowFile : flowFiles) {
+ final String evaluatedInput = context.getProperty(QUERY_INPUT).evaluateAttributeExpressions(flowFile).getValue();
+ buildString = buildString + evaluatedInput + "\n";
+ }
+
+ // Verify the the protocol mode and craft the "end" pseudo-command, otherwise just the query type
+ buildString = context.getProperty(BULK_PROTOCOL).getValue().equals(BEGIN_END.getValue()) ? buildString.concat("end") : buildString.concat("");
+
+
+ final String queryParser = context.getProperty(QUERY_PARSER).getValue();
+ final String queryRegex = context.getProperty(QUERY_PARSER_INPUT).getValue();
+ final int keyLookup = context.getProperty(KEY_GROUP).asInteger();
+ final int whoisTimeout = context.getProperty(WHOIS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final String whoisServer = context.getProperty(WHOIS_SERVER).getValue();
+ final int whoisPort = context.getProperty(WHOIS_SERVER_PORT).asInteger();
+
+ final List flowFilesMatched = new ArrayList();
+ final List flowFilesNotMatched = new ArrayList();
+
+
+ String result = doLookup(whoisServer, whoisPort, whoisTimeout, buildString);
+ if (StringUtils.isEmpty(result)) {
+ // If nothing was returned, let the processor continue its life and transfer the batch to REL_NOT_FOUND
+ session.transfer(flowFiles, REL_NOT_FOUND);
+ return;
+ } else {
+ // Run as normal
+ for (FlowFile flowFile : flowFiles) {
+ // Check the batchSize. If 1, run normal parser
+ if (batchSize == 1) {
+
+ Map parsedResults = parseResponse(null, result, queryParser, queryRegex, "whois");
+
+ if (parsedResults.isEmpty()) {
+ // parsedResults didn't return anything valid, sending to not found.
+ flowFilesNotMatched.add(flowFile);
+ } else {
+ // Still, extraction is needed
+ flowFile = session.putAllAttributes(flowFile, parsedResults);
+ flowFilesMatched.add(flowFile);
+
+ // Finished processing single result
+ }
+ } else {
+ // Otherwise call the multiline parser and get the row map;
+ final Map> rowMap = parseBatchResponse(result, queryParser, queryRegex, keyLookup, "whois").rowMap();
+
+ // Identify the flowfile Lookupvalue and search against the rowMap
+ String ffLookupValue = context.getProperty(QUERY_INPUT).evaluateAttributeExpressions(flowFile).getValue();
+
+ if (rowMap.containsKey(ffLookupValue)) {
+ // flowfile Lookup Value is contained within the results, get the properties and add to matched list
+ flowFile = session.putAllAttributes(flowFile, rowMap.get(ffLookupValue));
+ flowFilesMatched.add(flowFile);
+ } else {
+ // otherwise add to Not Matched
+ flowFilesNotMatched.add(flowFile);
+ }
+ }
+ }
+ }
+
+ // Finally prepare to send the data down the pipeline
+ // Because batches may include matches and non-matches, test both and send
+ // each to its relationship
+ if (flowFilesMatched.size() > 0) {
+ // Sending the resulting flowfile (with attributes) to REL_FOUND
+ session.transfer(flowFilesMatched, REL_FOUND);
+ }
+ if (flowFilesNotMatched.size() > 0) {
+ // Sending whatetver didn't match to REL_NOT_FOUND
+ session.transfer(flowFilesNotMatched, REL_NOT_FOUND);
+ }
+ }
+
+ /**
+ * This method performs a simple Whois lookup
+ * @param whoisServer Server to be queried;
+ * @param whoisPort TCP port to be useed to connect to server
+ * @param whoisTimeout How long to wait for a response (in ms);
+ * @param query The query to be made;
+ */
+ protected String doLookup(String whoisServer, int whoisPort, int whoisTimeout, String query) {
+ // This is a simple WHOIS lookup attempt
+
+ String result = null;
+
+ whoisClient = createClient();
+
+ try {
+ // Uses pre-existing context to resolve
+ if (!whoisClient.isConnected()) {
+ whoisClient.connect(whoisServer, whoisPort);
+ whoisClient.setSoTimeout(whoisTimeout);
+ result = whoisClient.query(query);
+ // clean up...
+ if (whoisClient.isConnected()) whoisClient.disconnect();
+ }
+ } catch ( IOException e) {
+ getLogger().error("Query failed due to {}", new Object[]{e.getMessage()}, e);
+ throw new ProcessException("Error performing Whois Lookup", e);
+ }
+ return result;
+ }
+
+
+ /*
+ Note createClient() was separated from the rest of code
+ in order to allow powermock to inject a fake return
+ during testing
+ */
+ protected WhoisClient createClient() {
+ return new WhoisClient();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index e4a39f5595..5871892fe3 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,3 +15,4 @@
org.apache.nifi.processors.GeoEnrichIP
org.apache.nifi.processors.enrich.QueryDNS
+org.apache.nifi.processors.enrich.QueryWhois
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
index 593ff0864e..6e15f93601 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
@@ -155,6 +155,7 @@ public class TestQueryDNS {
@Test
public void testInvalidData() {
+ queryDNSTestRunner.removeProperty(QueryDNS.QUERY_PARSER_INPUT);
queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "AAAA");
queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000 ms");
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryWhois.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryWhois.java
new file mode 100644
index 0000000000..4c0a1f95ad
--- /dev/null
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryWhois.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.enrich;
+
+
+import org.apache.commons.net.whois.WhoisClient;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({WhoisClient.class})
+public class TestQueryWhois {
+ private QueryWhois queryWhois;
+ private TestRunner queryWhoisTestRunner;
+
+ @Before
+ public void setupTest() throws Exception {
+ // This is what is sent by Mockito
+ String header = "AS | IP | BGP Prefix | CC | Registry | Allocated | Info | AS Name\n";
+ String responseBodyLine1 = "999 | 123.123.123.123 | 123.123.123.123/32 | AU | apnic | 2014-01-01 | 2016-08-14 01:32:01 GMT | Apache NiFi\n";
+ String responseBodyLine2 = "333 | 124.124.124.124 | 124.124.124.124/32 | AU | apnic | 2014-01-01 | 2016-08-14 01:32:01 GMT | Apache NiFi\n";
+
+ WhoisClient whoisClient = PowerMockito.mock(WhoisClient.class);
+ Mockito.when(whoisClient.query(Mockito.anyString())).thenReturn(header + responseBodyLine1 + responseBodyLine2);
+
+ this.queryWhois = new QueryWhois() {
+ @Override
+ protected WhoisClient createClient(){
+ return whoisClient;
+ }
+ };
+ this.queryWhoisTestRunner = TestRunners.newTestRunner(queryWhois);
+
+ }
+
+
+
+ @Test
+ public void testCustomValidator() {
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "peer");
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "nifi.apache.org");
+
+ // Note the absence of a QUERY_PARSER_INPUT value
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+ queryWhoisTestRunner.assertNotValid();
+
+
+ // Note the presence of a QUERY_PARSER_INPUT value
+ queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "1");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
+ queryWhoisTestRunner.assertValid();
+
+ // Note BULK_PROTOCOL and BATCH_SIZE
+ queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "1");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
+ queryWhoisTestRunner.setProperty(QueryWhois.BULK_PROTOCOL, QueryWhois.BEGIN_END.getValue());
+ queryWhoisTestRunner.assertNotValid();
+
+ // Note the presence of a QUERY_PARSER_INPUT value while NONE is set
+ queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "1");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.NONE.getValue());
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
+ queryWhoisTestRunner.assertNotValid();
+//
+ queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "10");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.NONE.getValue());
+ queryWhoisTestRunner.removeProperty(QueryWhois.QUERY_PARSER_INPUT);
+ queryWhoisTestRunner.assertNotValid();
+
+ queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "10");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
+ queryWhoisTestRunner.removeProperty(QueryWhois.KEY_GROUP);
+ queryWhoisTestRunner.assertNotValid();
+
+ queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "10");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
+ queryWhoisTestRunner.removeProperty(QueryWhois.KEY_GROUP);
+ queryWhoisTestRunner.assertNotValid();
+
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.NONE.getValue());
+ queryWhoisTestRunner.assertNotValid();
+
+
+ }
+
+
+ @Test
+ public void testValidDataWithSplit() {
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "origin");
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" +
+ ".${ip_address:getDelimitedField(3, '.'):trim()}" +
+ ".${ip_address:getDelimitedField(2, '.'):trim()}" +
+ ".${ip_address:getDelimitedField(1, '.'):trim()}");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.SPLIT.getValue());
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\s+\\|\\s+");
+ queryWhoisTestRunner.setProperty(QueryWhois.KEY_GROUP, "2");
+
+ final Map attributeMap1 = new HashMap<>();
+ final Map attributeMap2 = new HashMap<>();
+ final Map attributeMap3 = new HashMap<>();
+ attributeMap1.put("ip_address", "123.123.123.123");
+ attributeMap2.put("ip_address", "124.124.124.124");
+ attributeMap3.put("ip_address", "125.125.125.125");
+
+ queryWhoisTestRunner.enqueue(new byte[0], attributeMap1);
+ queryWhoisTestRunner.enqueue(new byte[0], attributeMap2);
+ queryWhoisTestRunner.enqueue(new byte[0], attributeMap3);
+ queryWhoisTestRunner.run();
+
+ List matchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_FOUND);
+ assertTrue(matchingResults.size() == 2);
+ List nonMatchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_NOT_FOUND);
+ assertTrue(nonMatchingResults.size() == 1);
+
+ matchingResults.get(0).assertAttributeEquals("enrich.whois.record0.group7", "Apache NiFi");
+ }
+
+ @Test
+ public void testValidDataWithRegex() {
+
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "origin");
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
+ queryWhoisTestRunner.setProperty(QueryWhois.KEY_GROUP, "2");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" +
+ ".${ip_address:getDelimitedField(3, '.'):trim()}" +
+ ".${ip_address:getDelimitedField(2, '.'):trim()}" +
+ ".${ip_address:getDelimitedField(1, '.'):trim()}");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\n^([^\\|]*)\\|\\s+(\\S+)\\s+\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|(.*)$");
+
+ final Map attributeMap1 = new HashMap<>();
+ final Map attributeMap2 = new HashMap<>();
+ final Map attributeMap3 = new HashMap<>();
+ attributeMap1.put("ip_address", "123.123.123.123");
+ attributeMap2.put("ip_address", "124.124.124.124");
+ attributeMap3.put("ip_address", "125.125.125.125");
+
+ queryWhoisTestRunner.enqueue(new byte[0], attributeMap1);
+ queryWhoisTestRunner.enqueue(new byte[0], attributeMap2);
+ queryWhoisTestRunner.enqueue(new byte[0], attributeMap3);
+ queryWhoisTestRunner.run();
+
+ List matchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_FOUND);
+ assertTrue(matchingResults.size() == 2);
+ List nonMatchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_NOT_FOUND);
+ assertTrue(nonMatchingResults.size() == 1);
+
+ matchingResults.get(0).assertAttributeEquals("enrich.whois.record0.group8", " Apache NiFi");
+
+ }
+
+ @Test
+ public void testValidDataWithRegexButInvalidCaptureGroup() {
+
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "origin");
+ queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
+ queryWhoisTestRunner.setProperty(QueryWhois.KEY_GROUP, "9");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" +
+ ".${ip_address:getDelimitedField(3, '.'):trim()}" +
+ ".${ip_address:getDelimitedField(2, '.'):trim()}" +
+ ".${ip_address:getDelimitedField(1, '.'):trim()}");
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+ queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\n^([^\\|]*)\\|\\s+(\\S+)\\s+\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|(.*)$");
+
+ final Map attributeMap1 = new HashMap<>();
+ final Map attributeMap2 = new HashMap<>();
+ final Map attributeMap3 = new HashMap<>();
+ attributeMap1.put("ip_address", "123.123.123.123");
+ attributeMap2.put("ip_address", "124.124.124.124");
+ attributeMap3.put("ip_address", "125.125.125.125");
+
+ queryWhoisTestRunner.enqueue(new byte[0], attributeMap1);
+ queryWhoisTestRunner.enqueue(new byte[0], attributeMap2);
+ queryWhoisTestRunner.enqueue(new byte[0], attributeMap3);
+ queryWhoisTestRunner.run();
+
+ List matchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_FOUND);
+ assertTrue(matchingResults.size() == 0);
+ List nonMatchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_NOT_FOUND);
+ assertTrue(nonMatchingResults.size() == 3);
+
+ }
+
+}
+