NIFI-1971 - Introduce QueryWhois processor with batching (i.e. netcat protocol) support

This closes #858.
This commit is contained in:
Andre F de Miranda 2016-08-09 23:01:53 +10:00 committed by Pierre Villard
parent 2054888fb1
commit 25b7dfa9b1
8 changed files with 698 additions and 13 deletions

View File

@ -51,6 +51,16 @@ The following binary components are provided under the Apache Software License v
Apache Commons Logging
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) Apache Commons Net
The following NOTICE information applies:
Apache Commons Net
Copyright 2001-2016 The Apache Software Foundation
(ASLv2) Guava
The following NOTICE information applies:
Guava
Copyright 2015 The Guava Authors
(ASLv2) GeoIP2 Java API
The following NOTICE information applies:
GeoIP2 Java API

View File

@ -63,5 +63,31 @@
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>1.6.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<version>1.6.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito-common</artifactId>
<version>1.6.5</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -18,6 +18,10 @@
package org.apache.nifi.processors.enrich;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -30,6 +34,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -62,10 +67,21 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
public static final PropertyDescriptor QUERY_PARSER_INPUT = new PropertyDescriptor.Builder()
.name("QUERY_PARSER_INPUT")
.displayName("Parser RegEx")
.description("Choice between a splitter and regex matcher used to parse the results of the query into attribute groups")
.description("Choice between a splitter and regex matcher used to parse the results of the query into attribute groups.\n" +
"NOTE: This is a multiline regular expression, therefore, the DFM should decide how to handle trailing new line " +
"characters.")
.expressionLanguageSupported(false)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor KEY_GROUP = new PropertyDescriptor.Builder()
.name("KEY_GROUP")
.displayName("Key lookup group (multiline / batch)")
.description("When performing a batched lookup, the following RegEx numbered capture group or Column number will be used to match " +
"the whois server response with the lookup field")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
@ -85,18 +101,20 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final String chosenQUERY_PARSER = validationContext.getProperty(QUERY_PARSER).getValue();
final boolean QUERY_PARSER_INPUT_isSet = validationContext.getProperty(QUERY_PARSER_INPUT).isSet();
if ((!chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( !QUERY_PARSER_INPUT_isSet )) {
if (!chosenQUERY_PARSER.equals(NONE.getValue()) && !validationContext.getProperty(QUERY_PARSER_INPUT).isSet() ) {
results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT")
.subject(QUERY_PARSER.getDisplayName())
.explanation("Split and Regex parsers require a valid Regular Expression")
.valid(false)
.build());
}
if ((chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( QUERY_PARSER_INPUT_isSet )) {
results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT")
.explanation("NONE parser does not support the use of Regular Expressions")
if (chosenQUERY_PARSER.equals(NONE.getValue()) && validationContext.getProperty(QUERY_PARSER_INPUT).isSet()) {
results.add(new ValidationResult.Builder().input("QUERY_PARSER")
.subject(QUERY_PARSER_INPUT.getDisplayName())
.explanation("NONE parser does not support the use of Regular Expressions. " +
"Please select another parser or delete the regular expression entered in this field.")
.valid(false)
.build());
}
@ -117,11 +135,11 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
* @param queryRegex The regex to be used to split the query results into groups
* @return Map with attribute names and values
*/
protected Map<String, String> parseResponse(int recordPosition, String rawResult, String queryParser, String queryRegex, String schema) {
protected Map<String, String> parseResponse(String recordPosition, String rawResult, String queryParser, String queryRegex, String schema) {
Map<String, String> results = new HashMap<>();
Pattern p;
recordPosition = StringUtils.isEmpty(recordPosition) ? "0" : recordPosition;
// Iterates over the results using the QUERY_REGEX adding the captured groups
// as it progresses
@ -131,7 +149,7 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
// Time to Split the results...
String[] splitResult = rawResult.split(queryRegex);
for (int r = 0; r < splitResult.length; r++) {
results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group" + String.valueOf(r), splitResult[r]);
results.put("enrich." + schema + ".record" + recordPosition + ".group" + String.valueOf(r), splitResult[r]);
}
break;
@ -143,7 +161,7 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
// Note that RegEx matches capture group 0 is usually broad but starting with it anyway
// for the sake of purity
for (int r = 0; r < finalResult.groupCount(); r++) {
results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group" + String.valueOf(r), finalResult.group(r));
results.put("enrich." + schema + ".record" + recordPosition + ".group" + String.valueOf(r), finalResult.group(r));
}
}
break;
@ -152,10 +170,68 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
// Fails to NONE
default:
// NONE was chosen, just appending the record result as group0 without further splitting
results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group0", rawResult);
results.put("enrich." + schema + ".record" + recordPosition + ".group0", rawResult);
break;
}
return results;
}
/**
* This method returns the parsed record string in the form of
* a map of two strings, consisting of a iteration aware attribute
* names and its values
*
* @param rawResult the raw query results to be parsed
* @param queryParser The parsing mechanism being used to parse the data into groups
* @param queryRegex The regex to be used to split the query results into groups. The regex MUST implement at least on named capture group "KEY" to be used to populate the table rows
* @param lookupKey The regular expression number or the column of a split to be used for matching
* @return Table with attribute names and values where each Table row uses the value of the KEY named capture group specified in @param queryRegex
*/
protected Table<String, String, String> parseBatchResponse(String rawResult, String queryParser, String queryRegex, int lookupKey, String schema) {
// Note the hardcoded record0.
// Since iteration is done within the parser and Multimap is used, the record number here will always be 0.
// Consequentially, 0 is hardcoded so that batched and non batched attributes follow the same naming
// conventions
final String recordPosition = ".record0";
final Table<String, String, String> results = HashBasedTable.create();
switch (queryParser) {
case "Split":
Scanner scanner = new Scanner(rawResult);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
// Time to Split the results...
String[] splitResult = line.split(queryRegex);
for (int r = 0; r < splitResult.length; r++) {
results.put(splitResult[ lookupKey - 1 ], "enrich." + schema + recordPosition + ".group" + String.valueOf(r), splitResult[r]);
}
}
break;
case "RegEx":
// prepare the regex
Pattern p;
// Regex is multiline. Each line should include a KEY for lookup
p = Pattern.compile(queryRegex, Pattern.MULTILINE);
Matcher matcher = p.matcher(rawResult);
while (matcher.find()) {
try {
// Note that RegEx matches capture group 0 is usually broad but starting with it anyway
// for the sake of purity
for (int r = 0; r <= matcher.groupCount(); r++) {
results.put(matcher.group(lookupKey), "enrich." + schema + recordPosition + ".group" + String.valueOf(r), matcher.group(r));
}
} catch (IndexOutOfBoundsException e) {
getLogger().warn("Could not find capture group {} while processing result. You may want to review your " +
"Regular Expression to match against the content \"{}\"", new Object[]{lookupKey, rawResult});
}
}
break;
}
return results;
}
}

View File

@ -176,7 +176,8 @@ public class QueryDNS extends AbstractEnrichProcessor {
// While NXDOMAIN is being generated by doLookup catch
if (dnsRecord != "NXDOMAIN") {
Map<String, String> parsedResults = parseResponse(recordNumber, dnsRecord, queryParser, queryRegex, "dns");
// Map<String, String> parsedResults = parseResponse(recordNumber, dnsRecord, queryParser, queryRegex, "dns");
Map<String, String> parsedResults = parseResponse(String.valueOf(recordNumber), dnsRecord, queryParser, queryRegex, "dns");
flowFile = session.putAllAttributes(flowFile, parsedResults);
found = true;
} else {

View File

@ -0,0 +1,347 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.enrich;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.whois.WhoisClient;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"whois", "enrich", "ip"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("A powerful whois query processor primary designed to enrich DataFlows with whois based APIs " +
"(e.g. ShadowServer's ASN lookup) but that can be also used to perform regular whois lookups.")
@WritesAttributes({
@WritesAttribute(attribute = "enrich.dns.record*.group*", description = "The captured fields of the Whois query response for each of the records received"),
})
public class QueryWhois extends AbstractEnrichProcessor {
public static final AllowableValue BEGIN_END = new AllowableValue("Begin/End", "Begin/End",
"The evaluated input of each flowfile is enclosed within begin and end tags. Each row contains a delimited set of fields");
public static final AllowableValue BULK_NONE = new AllowableValue("None", "None",
"Queries are made without any particular dialect");
public static final PropertyDescriptor WHOIS_QUERY_TYPE = new PropertyDescriptor.Builder()
.name("WHOIS_QUERY_TYPE")
.displayName("Whois Query Type")
.description("The Whois query type to be used by the processor (if used)")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor WHOIS_SERVER = new PropertyDescriptor.Builder()
.name("WHOIS_SERVER")
.displayName("Whois Server")
.description("The Whois server to be used")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor WHOIS_SERVER_PORT = new PropertyDescriptor.Builder()
.name("WHOIS_SERVER_PORT")
.displayName("Whois Server Port")
.description("The TCP port of the remote Whois server")
.required(true)
.defaultValue("43")
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor WHOIS_TIMEOUT = new PropertyDescriptor.Builder()
.name("WHOIS_TIMEOUT")
.displayName("Whois Query Timeout")
.description("The amount of time to wait until considering a query as failed")
.required(true)
.defaultValue("1500 ms")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor. ")
.required(true)
.defaultValue("25")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor BULK_PROTOCOL = new PropertyDescriptor.Builder()
.name("BULK_PROTOCOL")
.displayName("Bulk Protocol")
.description("The protocol used to perform the bulk query. ")
.required(true)
.defaultValue(BULK_NONE.getValue())
.allowableValues(BEGIN_END, BULK_NONE)
.build();
@Override
public List<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final String chosenQUERY_PARSER = validationContext.getProperty(QUERY_PARSER).getValue();
if (!chosenQUERY_PARSER.equals(NONE.getValue()) && !validationContext.getProperty(QUERY_PARSER_INPUT).isSet() ) {
results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT")
.subject(QUERY_PARSER_INPUT.getDisplayName())
.explanation("Split and Regex parsers require a valid Regular Expression")
.valid(false)
.build());
}
if (validationContext.getProperty(BATCH_SIZE).asInteger() > 1 && !validationContext.getProperty(KEY_GROUP).isSet() ) {
results.add(new ValidationResult.Builder().input("KEY_GROUP")
.subject(KEY_GROUP.getDisplayName())
.explanation("when operating in Batching mode, RegEx and Split parsers require a " +
"valid capture group/matching column. Configure the processor batch size to 1" +
" or enter a valid column / named capture value.")
.valid(false)
.build());
}
if ( validationContext.getProperty(BATCH_SIZE).asInteger() > 1 && chosenQUERY_PARSER.equals(NONE.getValue()) ) {
results.add(new ValidationResult.Builder().input(validationContext.getProperty(BATCH_SIZE).getValue())
.subject(QUERY_PARSER.getDisplayName())
.explanation("NONE parser does not support batching. Configure Batch Size to 1 or use another parser.")
.valid(false)
.build());
}
if ( validationContext.getProperty(BATCH_SIZE).asInteger() == 1 && !validationContext.getProperty(BULK_PROTOCOL).getValue().equals(BULK_NONE.getValue()) ) {
results.add(new ValidationResult.Builder().input("BULK_PROTOCOL")
.subject(BATCH_SIZE.getDisplayName())
.explanation("Bulk protocol requirement requires batching. Configure Batch Size to more than 1 or " +
"use another protocol.")
.valid(false)
.build());
}
return results;
}
private final static List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
private WhoisClient whoisClient;
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(QUERY_INPUT);
props.add(WHOIS_QUERY_TYPE);
props.add(WHOIS_SERVER);
props.add(WHOIS_SERVER_PORT);
props.add(WHOIS_TIMEOUT);
props.add(BATCH_SIZE);
props.add(BULK_PROTOCOL);
props.add(QUERY_PARSER);
props.add(QUERY_PARSER_INPUT);
props.add(KEY_GROUP);
propertyDescriptors = Collections.unmodifiableList(props);
Set<Relationship> rels = new HashSet<>();
rels.add(REL_FOUND);
rels.add(REL_NOT_FOUND);
relationships = Collections.unmodifiableSet(rels);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles == null || flowFiles.isEmpty()) {
context.yield();
return;
}
// Build query
String buildString = "";
final String queryType = context.getProperty(WHOIS_QUERY_TYPE).getValue();
// Verify the the protocol mode and craft the "begin" pseudo-command, otherwise just the query type
buildString = context.getProperty(BULK_PROTOCOL).getValue().equals(BEGIN_END.getValue()) ? buildString.concat("begin ") : buildString.concat("");
// Append the query type
buildString = context.getProperty(WHOIS_QUERY_TYPE).isSet() ? buildString.concat(queryType + " " ) : buildString.concat("");
// A new line is required when working on Begin/End
buildString = context.getProperty(BULK_PROTOCOL).getValue().equals(BEGIN_END.getValue()) ? buildString.concat("\n") : buildString.concat("");
// append the values
for (FlowFile flowFile : flowFiles) {
final String evaluatedInput = context.getProperty(QUERY_INPUT).evaluateAttributeExpressions(flowFile).getValue();
buildString = buildString + evaluatedInput + "\n";
}
// Verify the the protocol mode and craft the "end" pseudo-command, otherwise just the query type
buildString = context.getProperty(BULK_PROTOCOL).getValue().equals(BEGIN_END.getValue()) ? buildString.concat("end") : buildString.concat("");
final String queryParser = context.getProperty(QUERY_PARSER).getValue();
final String queryRegex = context.getProperty(QUERY_PARSER_INPUT).getValue();
final int keyLookup = context.getProperty(KEY_GROUP).asInteger();
final int whoisTimeout = context.getProperty(WHOIS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final String whoisServer = context.getProperty(WHOIS_SERVER).getValue();
final int whoisPort = context.getProperty(WHOIS_SERVER_PORT).asInteger();
final List<FlowFile> flowFilesMatched = new ArrayList<FlowFile>();
final List<FlowFile> flowFilesNotMatched = new ArrayList<FlowFile>();
String result = doLookup(whoisServer, whoisPort, whoisTimeout, buildString);
if (StringUtils.isEmpty(result)) {
// If nothing was returned, let the processor continue its life and transfer the batch to REL_NOT_FOUND
session.transfer(flowFiles, REL_NOT_FOUND);
return;
} else {
// Run as normal
for (FlowFile flowFile : flowFiles) {
// Check the batchSize. If 1, run normal parser
if (batchSize == 1) {
Map<String, String> parsedResults = parseResponse(null, result, queryParser, queryRegex, "whois");
if (parsedResults.isEmpty()) {
// parsedResults didn't return anything valid, sending to not found.
flowFilesNotMatched.add(flowFile);
} else {
// Still, extraction is needed
flowFile = session.putAllAttributes(flowFile, parsedResults);
flowFilesMatched.add(flowFile);
// Finished processing single result
}
} else {
// Otherwise call the multiline parser and get the row map;
final Map<String, Map<String, String>> rowMap = parseBatchResponse(result, queryParser, queryRegex, keyLookup, "whois").rowMap();
// Identify the flowfile Lookupvalue and search against the rowMap
String ffLookupValue = context.getProperty(QUERY_INPUT).evaluateAttributeExpressions(flowFile).getValue();
if (rowMap.containsKey(ffLookupValue)) {
// flowfile Lookup Value is contained within the results, get the properties and add to matched list
flowFile = session.putAllAttributes(flowFile, rowMap.get(ffLookupValue));
flowFilesMatched.add(flowFile);
} else {
// otherwise add to Not Matched
flowFilesNotMatched.add(flowFile);
}
}
}
}
// Finally prepare to send the data down the pipeline
// Because batches may include matches and non-matches, test both and send
// each to its relationship
if (flowFilesMatched.size() > 0) {
// Sending the resulting flowfile (with attributes) to REL_FOUND
session.transfer(flowFilesMatched, REL_FOUND);
}
if (flowFilesNotMatched.size() > 0) {
// Sending whatetver didn't match to REL_NOT_FOUND
session.transfer(flowFilesNotMatched, REL_NOT_FOUND);
}
}
/**
* This method performs a simple Whois lookup
* @param whoisServer Server to be queried;
* @param whoisPort TCP port to be useed to connect to server
* @param whoisTimeout How long to wait for a response (in ms);
* @param query The query to be made;
*/
protected String doLookup(String whoisServer, int whoisPort, int whoisTimeout, String query) {
// This is a simple WHOIS lookup attempt
String result = null;
whoisClient = createClient();
try {
// Uses pre-existing context to resolve
if (!whoisClient.isConnected()) {
whoisClient.connect(whoisServer, whoisPort);
whoisClient.setSoTimeout(whoisTimeout);
result = whoisClient.query(query);
// clean up...
if (whoisClient.isConnected()) whoisClient.disconnect();
}
} catch ( IOException e) {
getLogger().error("Query failed due to {}", new Object[]{e.getMessage()}, e);
throw new ProcessException("Error performing Whois Lookup", e);
}
return result;
}
/*
Note createClient() was separated from the rest of code
in order to allow powermock to inject a fake return
during testing
*/
protected WhoisClient createClient() {
return new WhoisClient();
}
}

View File

@ -15,3 +15,4 @@
org.apache.nifi.processors.GeoEnrichIP
org.apache.nifi.processors.enrich.QueryDNS
org.apache.nifi.processors.enrich.QueryWhois

View File

@ -155,6 +155,7 @@ public class TestQueryDNS {
@Test
public void testInvalidData() {
queryDNSTestRunner.removeProperty(QueryDNS.QUERY_PARSER_INPUT);
queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "AAAA");
queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000 ms");

View File

@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.enrich;
import org.apache.commons.net.whois.WhoisClient;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@PrepareForTest({WhoisClient.class})
public class TestQueryWhois {
private QueryWhois queryWhois;
private TestRunner queryWhoisTestRunner;
@Before
public void setupTest() throws Exception {
// This is what is sent by Mockito
String header = "AS | IP | BGP Prefix | CC | Registry | Allocated | Info | AS Name\n";
String responseBodyLine1 = "999 | 123.123.123.123 | 123.123.123.123/32 | AU | apnic | 2014-01-01 | 2016-08-14 01:32:01 GMT | Apache NiFi\n";
String responseBodyLine2 = "333 | 124.124.124.124 | 124.124.124.124/32 | AU | apnic | 2014-01-01 | 2016-08-14 01:32:01 GMT | Apache NiFi\n";
WhoisClient whoisClient = PowerMockito.mock(WhoisClient.class);
Mockito.when(whoisClient.query(Mockito.anyString())).thenReturn(header + responseBodyLine1 + responseBodyLine2);
this.queryWhois = new QueryWhois() {
@Override
protected WhoisClient createClient(){
return whoisClient;
}
};
this.queryWhoisTestRunner = TestRunners.newTestRunner(queryWhois);
}
@Test
public void testCustomValidator() {
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "peer");
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "nifi.apache.org");
// Note the absence of a QUERY_PARSER_INPUT value
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
queryWhoisTestRunner.assertNotValid();
// Note the presence of a QUERY_PARSER_INPUT value
queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "1");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
queryWhoisTestRunner.assertValid();
// Note BULK_PROTOCOL and BATCH_SIZE
queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "1");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
queryWhoisTestRunner.setProperty(QueryWhois.BULK_PROTOCOL, QueryWhois.BEGIN_END.getValue());
queryWhoisTestRunner.assertNotValid();
// Note the presence of a QUERY_PARSER_INPUT value while NONE is set
queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "1");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.NONE.getValue());
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
queryWhoisTestRunner.assertNotValid();
//
queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "10");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.NONE.getValue());
queryWhoisTestRunner.removeProperty(QueryWhois.QUERY_PARSER_INPUT);
queryWhoisTestRunner.assertNotValid();
queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "10");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
queryWhoisTestRunner.removeProperty(QueryWhois.KEY_GROUP);
queryWhoisTestRunner.assertNotValid();
queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "10");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
queryWhoisTestRunner.removeProperty(QueryWhois.KEY_GROUP);
queryWhoisTestRunner.assertNotValid();
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.NONE.getValue());
queryWhoisTestRunner.assertNotValid();
}
@Test
public void testValidDataWithSplit() {
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "origin");
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" +
".${ip_address:getDelimitedField(3, '.'):trim()}" +
".${ip_address:getDelimitedField(2, '.'):trim()}" +
".${ip_address:getDelimitedField(1, '.'):trim()}");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.SPLIT.getValue());
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\s+\\|\\s+");
queryWhoisTestRunner.setProperty(QueryWhois.KEY_GROUP, "2");
final Map<String, String> attributeMap1 = new HashMap<>();
final Map<String, String> attributeMap2 = new HashMap<>();
final Map<String, String> attributeMap3 = new HashMap<>();
attributeMap1.put("ip_address", "123.123.123.123");
attributeMap2.put("ip_address", "124.124.124.124");
attributeMap3.put("ip_address", "125.125.125.125");
queryWhoisTestRunner.enqueue(new byte[0], attributeMap1);
queryWhoisTestRunner.enqueue(new byte[0], attributeMap2);
queryWhoisTestRunner.enqueue(new byte[0], attributeMap3);
queryWhoisTestRunner.run();
List<MockFlowFile> matchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_FOUND);
assertTrue(matchingResults.size() == 2);
List<MockFlowFile> nonMatchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_NOT_FOUND);
assertTrue(nonMatchingResults.size() == 1);
matchingResults.get(0).assertAttributeEquals("enrich.whois.record0.group7", "Apache NiFi");
}
@Test
public void testValidDataWithRegex() {
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "origin");
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
queryWhoisTestRunner.setProperty(QueryWhois.KEY_GROUP, "2");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" +
".${ip_address:getDelimitedField(3, '.'):trim()}" +
".${ip_address:getDelimitedField(2, '.'):trim()}" +
".${ip_address:getDelimitedField(1, '.'):trim()}");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\n^([^\\|]*)\\|\\s+(\\S+)\\s+\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|(.*)$");
final Map<String, String> attributeMap1 = new HashMap<>();
final Map<String, String> attributeMap2 = new HashMap<>();
final Map<String, String> attributeMap3 = new HashMap<>();
attributeMap1.put("ip_address", "123.123.123.123");
attributeMap2.put("ip_address", "124.124.124.124");
attributeMap3.put("ip_address", "125.125.125.125");
queryWhoisTestRunner.enqueue(new byte[0], attributeMap1);
queryWhoisTestRunner.enqueue(new byte[0], attributeMap2);
queryWhoisTestRunner.enqueue(new byte[0], attributeMap3);
queryWhoisTestRunner.run();
List<MockFlowFile> matchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_FOUND);
assertTrue(matchingResults.size() == 2);
List<MockFlowFile> nonMatchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_NOT_FOUND);
assertTrue(nonMatchingResults.size() == 1);
matchingResults.get(0).assertAttributeEquals("enrich.whois.record0.group8", " Apache NiFi");
}
@Test
public void testValidDataWithRegexButInvalidCaptureGroup() {
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "origin");
queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
queryWhoisTestRunner.setProperty(QueryWhois.KEY_GROUP, "9");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" +
".${ip_address:getDelimitedField(3, '.'):trim()}" +
".${ip_address:getDelimitedField(2, '.'):trim()}" +
".${ip_address:getDelimitedField(1, '.'):trim()}");
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\n^([^\\|]*)\\|\\s+(\\S+)\\s+\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|(.*)$");
final Map<String, String> attributeMap1 = new HashMap<>();
final Map<String, String> attributeMap2 = new HashMap<>();
final Map<String, String> attributeMap3 = new HashMap<>();
attributeMap1.put("ip_address", "123.123.123.123");
attributeMap2.put("ip_address", "124.124.124.124");
attributeMap3.put("ip_address", "125.125.125.125");
queryWhoisTestRunner.enqueue(new byte[0], attributeMap1);
queryWhoisTestRunner.enqueue(new byte[0], attributeMap2);
queryWhoisTestRunner.enqueue(new byte[0], attributeMap3);
queryWhoisTestRunner.run();
List<MockFlowFile> matchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_FOUND);
assertTrue(matchingResults.size() == 0);
List<MockFlowFile> nonMatchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_NOT_FOUND);
assertTrue(nonMatchingResults.size() == 3);
}
}