mirror of https://github.com/apache/nifi.git
parent
493919922e
commit
d6a2409d71
|
@ -20,6 +20,8 @@
|
||||||
<artifactId>nifi-enrich-bundle</artifactId>
|
<artifactId>nifi-enrich-bundle</artifactId>
|
||||||
<version>1.0.0-SNAPSHOT</version>
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
|
|
||||||
<artifactId>nifi-enrich-processors</artifactId>
|
<artifactId>nifi-enrich-processors</artifactId>
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -50,5 +52,16 @@
|
||||||
<artifactId>findbugs-annotations</artifactId>
|
<artifactId>findbugs-annotations</artifactId>
|
||||||
<version>1.3.9-1</version>
|
<version>1.3.9-1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<version>4.11</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mock</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -0,0 +1,161 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.enrich;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.nifi.components.AllowableValue;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
public abstract class AbstractEnrichProcessor extends AbstractProcessor {
|
||||||
|
public static final PropertyDescriptor QUERY_INPUT = new PropertyDescriptor.Builder()
|
||||||
|
.name("QUERY_INPUT")
|
||||||
|
.displayName("Lookup value")
|
||||||
|
.required(true)
|
||||||
|
.description("The value that should be used to populate the query")
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final AllowableValue SPLIT= new AllowableValue("Split", "Split",
|
||||||
|
"Use a delimiter character or RegEx to split the results into attributes");
|
||||||
|
public static final AllowableValue REGEX = new AllowableValue("RegEx", "RegEx",
|
||||||
|
"Use a regular expression to split the results into attributes ");
|
||||||
|
public static final AllowableValue NONE = new AllowableValue("None", "None",
|
||||||
|
"Do not split results");
|
||||||
|
|
||||||
|
public static final PropertyDescriptor QUERY_PARSER = new PropertyDescriptor.Builder()
|
||||||
|
.name("QUERY_PARSER")
|
||||||
|
.displayName("Results Parser")
|
||||||
|
.description("The method used to slice the results into attribute groups")
|
||||||
|
.allowableValues(SPLIT, REGEX, NONE)
|
||||||
|
.required(true)
|
||||||
|
.defaultValue(NONE.getValue())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor QUERY_PARSER_INPUT = new PropertyDescriptor.Builder()
|
||||||
|
.name("QUERY_PARSER_INPUT")
|
||||||
|
.displayName("Parser RegEx")
|
||||||
|
.description("Choice between a splitter and regex matcher used to parse the results of the query into attribute groups")
|
||||||
|
.expressionLanguageSupported(false)
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
public static final Relationship REL_FOUND = new Relationship.Builder()
|
||||||
|
.name("found")
|
||||||
|
.description("Where to route flow files after successfully enriching attributes with data")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
|
||||||
|
.name("not found")
|
||||||
|
.description("Where to route flow files if data enrichment query rendered no results")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||||
|
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
|
||||||
|
|
||||||
|
final String chosenQUERY_PARSER = validationContext.getProperty(QUERY_PARSER).getValue();
|
||||||
|
final boolean QUERY_PARSER_INPUT_isSet = validationContext.getProperty(QUERY_PARSER_INPUT).isSet();
|
||||||
|
|
||||||
|
if ((!chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( !QUERY_PARSER_INPUT_isSet )) {
|
||||||
|
results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT")
|
||||||
|
.explanation("Split and Regex parsers require a valid Regular Expression")
|
||||||
|
.valid(false)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( QUERY_PARSER_INPUT_isSet )) {
|
||||||
|
results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT")
|
||||||
|
.explanation("NONE parser does not support the use of Regular Expressions")
|
||||||
|
.valid(false)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method returns the parsed record string in the form of
|
||||||
|
* a map of two strings, consisting of a iteration aware attribute
|
||||||
|
* names and its values
|
||||||
|
*
|
||||||
|
* @param recordPosition the iteration counter for the record
|
||||||
|
* @param rawResult the raw query results to be parsed
|
||||||
|
* @param queryParser The parsing mechanism being used to parse the data into groups
|
||||||
|
* @param queryRegex The regex to be used to split the query results into groups
|
||||||
|
* @return Map with attribute names and values
|
||||||
|
*/
|
||||||
|
protected Map<String, String> parseResponse(int recordPosition, String rawResult, String queryParser, String queryRegex, String schema) {
|
||||||
|
|
||||||
|
Map<String, String> results = new HashMap<>();
|
||||||
|
Pattern p;
|
||||||
|
|
||||||
|
|
||||||
|
// Iterates over the results using the QUERY_REGEX adding the captured groups
|
||||||
|
// as it progresses
|
||||||
|
|
||||||
|
switch (queryParser) {
|
||||||
|
case "Split":
|
||||||
|
// Time to Split the results...
|
||||||
|
String[] splitResult = rawResult.split(queryRegex);
|
||||||
|
for (int r = 0; r < splitResult.length; r++) {
|
||||||
|
results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group" + String.valueOf(r), splitResult[r]);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "RegEx":
|
||||||
|
// RegEx was chosen, iterating...
|
||||||
|
p = Pattern.compile(queryRegex);
|
||||||
|
Matcher finalResult = p.matcher(rawResult);
|
||||||
|
if (finalResult.find()) {
|
||||||
|
// Note that RegEx matches capture group 0 is usually broad but starting with it anyway
|
||||||
|
// for the sake of purity
|
||||||
|
for (int r = 0; r < finalResult.groupCount(); r++) {
|
||||||
|
results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group" + String.valueOf(r), finalResult.group(r));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "None":
|
||||||
|
// Fails to NONE
|
||||||
|
default:
|
||||||
|
// NONE was chosen, just appending the record result as group0 without further splitting
|
||||||
|
results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group0", rawResult);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,276 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.enrich;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Hashtable;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import javax.naming.Context;
|
||||||
|
import javax.naming.NameNotFoundException;
|
||||||
|
import javax.naming.NamingEnumeration;
|
||||||
|
import javax.naming.NamingException;
|
||||||
|
import javax.naming.directory.Attributes;
|
||||||
|
import javax.naming.directory.BasicAttributes;
|
||||||
|
import javax.naming.directory.DirContext;
|
||||||
|
import javax.naming.directory.InitialDirContext;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||||
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
|
|
||||||
|
@EventDriven
|
||||||
|
@SideEffectFree
|
||||||
|
@SupportsBatching
|
||||||
|
@Tags({"dns", "enrich", "ip"})
|
||||||
|
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||||
|
@CapabilityDescription("A powerful DNS query processor primary designed to enrich DataFlows with DNS based APIs " +
|
||||||
|
"(e.g. RBLs, ShadowServer's ASN lookup) but that can be also used to perform regular DNS lookups.")
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute = "enrich.dns.record*.group*", description = "The captured fields of the DNS query response for each of the records received"),
|
||||||
|
})
|
||||||
|
public class QueryDNS extends AbstractEnrichProcessor {
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DNS_QUERY_TYPE = new PropertyDescriptor.Builder()
|
||||||
|
.name("DNS_QUERY_TYPE")
|
||||||
|
.displayName("DNS Query Type")
|
||||||
|
.description("The DNS query type to be used by the processor (e.g. TXT, A)")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("TXT")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DNS_SERVER = new PropertyDescriptor.Builder()
|
||||||
|
.name("DNS_SERVER")
|
||||||
|
.displayName("DNS Servers")
|
||||||
|
.description("A comma separated list of DNS servers to be used. (Defaults to system wide if none is used)")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DNS_TIMEOUT = new PropertyDescriptor.Builder()
|
||||||
|
.name("DNS_TIMEOUT")
|
||||||
|
.displayName("DNS Query Timeout")
|
||||||
|
.description("The amount of milliseconds to wait until considering a query as failed")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("1500")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DNS_RETRIES = new PropertyDescriptor.Builder()
|
||||||
|
.name("DNS_RETRIES")
|
||||||
|
.displayName("DNS Query Retries")
|
||||||
|
.description("The number of attempts before giving up and moving on")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("1")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
private final static List<PropertyDescriptor> propertyDescriptors;
|
||||||
|
private final static Set<Relationship> relationships;
|
||||||
|
|
||||||
|
private DirContext ictx;
|
||||||
|
|
||||||
|
// Assign the default and generally used contextFactory value
|
||||||
|
private String contextFactory = com.sun.jndi.dns.DnsContextFactory.class.getName();;
|
||||||
|
|
||||||
|
static {
|
||||||
|
List<PropertyDescriptor> props = new ArrayList<>();
|
||||||
|
props.add(QUERY_INPUT);
|
||||||
|
props.add(QUERY_PARSER);
|
||||||
|
props.add(QUERY_PARSER_INPUT);
|
||||||
|
props.add(DNS_RETRIES);
|
||||||
|
props.add(DNS_TIMEOUT);
|
||||||
|
props.add(DNS_SERVER);
|
||||||
|
props.add(DNS_QUERY_TYPE);
|
||||||
|
propertyDescriptors = Collections.unmodifiableList(props);
|
||||||
|
|
||||||
|
Set<Relationship> rels = new HashSet<>();
|
||||||
|
rels.add(REL_FOUND);
|
||||||
|
rels.add(REL_NOT_FOUND);
|
||||||
|
relationships = Collections.unmodifiableSet(rels);
|
||||||
|
}
|
||||||
|
private AtomicBoolean initialized = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return propertyDescriptors;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||||
|
if (!initialized.get()) {
|
||||||
|
initializeResolver(context);
|
||||||
|
getLogger().warn("Resolver was initialized at onTrigger instead of onScheduled");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
FlowFile flowFile = session.get();
|
||||||
|
if (flowFile == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String queryType = context.getProperty(DNS_QUERY_TYPE).getValue();
|
||||||
|
final String queryInput = context.getProperty(QUERY_INPUT).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String queryParser = context.getProperty(QUERY_PARSER).getValue();
|
||||||
|
final String queryRegex = context.getProperty(QUERY_PARSER_INPUT).getValue();
|
||||||
|
|
||||||
|
boolean found = false;
|
||||||
|
try {
|
||||||
|
Attributes results = doLookup(queryInput, queryType);
|
||||||
|
// NOERROR & NODATA seem to return empty Attributes handled bellow
|
||||||
|
// but defaulting to not found in any case
|
||||||
|
if (results.size() < 1) {
|
||||||
|
found = false;
|
||||||
|
} else {
|
||||||
|
int recordNumber = 0;
|
||||||
|
NamingEnumeration<?> dnsEntryIterator = results.get(queryType).getAll();
|
||||||
|
|
||||||
|
while (dnsEntryIterator.hasMoreElements()) {
|
||||||
|
String dnsRecord = dnsEntryIterator.next().toString();
|
||||||
|
// While NXDOMAIN is being generated by doLookup catch
|
||||||
|
|
||||||
|
if (dnsRecord != "NXDOMAIN") {
|
||||||
|
Map<String, String> parsedResults = parseResponse(recordNumber, dnsRecord, queryParser, queryRegex, "dns");
|
||||||
|
flowFile = session.putAllAttributes(flowFile, parsedResults);
|
||||||
|
found = true;
|
||||||
|
} else {
|
||||||
|
// Otherwise treat as not found
|
||||||
|
found = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increase the counter and iterate over next record....
|
||||||
|
recordNumber++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (NamingException e) {
|
||||||
|
context.yield();
|
||||||
|
throw new ProcessException("Unexpected NamingException while processing records. Please review your configuration.", e);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally prepare to send the data down the pipeline
|
||||||
|
if (found) {
|
||||||
|
// Sending the resulting flowfile (with attributes) to REL_FOUND
|
||||||
|
session.transfer(flowFile, REL_FOUND);
|
||||||
|
} else {
|
||||||
|
// NXDOMAIN received, accepting the fate but forwarding
|
||||||
|
// to REL_NOT_FOUND
|
||||||
|
session.transfer(flowFile, REL_NOT_FOUND);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@OnScheduled
|
||||||
|
public void onScheduled(ProcessContext context) {
|
||||||
|
try {
|
||||||
|
initializeResolver(context);
|
||||||
|
} catch (Exception e) {
|
||||||
|
context.yield();
|
||||||
|
throw new ProcessException("Failed to initialize the JNDI DNS resolver server", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void initializeResolver(final ProcessContext context ) {
|
||||||
|
|
||||||
|
final String dnsTimeout = context.getProperty(DNS_TIMEOUT).getValue();
|
||||||
|
final String dnsServer = context.getProperty(DNS_SERVER).getValue();
|
||||||
|
final String dnsRetries = context.getProperty(DNS_RETRIES).getValue();
|
||||||
|
|
||||||
|
|
||||||
|
String finalServer = "";
|
||||||
|
Hashtable<String,String> env = new Hashtable<String,String>();
|
||||||
|
env.put("java.naming.factory.initial", contextFactory);
|
||||||
|
env.put("com.sun.jndi.dns.timeout.initial", dnsTimeout);
|
||||||
|
env.put("com.sun.jndi.dns.timeout.retries", dnsRetries);
|
||||||
|
if (StringUtils.isNotEmpty(dnsServer)) {
|
||||||
|
for (String server : dnsServer.split(",")) {
|
||||||
|
finalServer = finalServer + "dns://" + server + "/. ";
|
||||||
|
}
|
||||||
|
env.put(Context.PROVIDER_URL, finalServer);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
initializeContext(env);
|
||||||
|
initialized.set(true);
|
||||||
|
} catch (NamingException e) {
|
||||||
|
getLogger().error("Could not initialize JNDI context", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method performs a simple DNS lookup using JNDI
|
||||||
|
* @param queryInput String containing the query body itself (e.g. 4.3.3.1.in-addr.arpa);
|
||||||
|
* @param queryType String containign the query type (e.g. TXT);
|
||||||
|
*/
|
||||||
|
protected Attributes doLookup(String queryInput, String queryType) throws NamingException {
|
||||||
|
// This is a simple DNS lookup attempt
|
||||||
|
|
||||||
|
Attributes attrs;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Uses pre-existing context to resolve
|
||||||
|
attrs = ictx.getAttributes(queryInput, new String[]{queryType});
|
||||||
|
return attrs;
|
||||||
|
} catch ( NameNotFoundException e) {
|
||||||
|
getLogger().debug("Resolution for domain {} failed due to {}", new Object[]{queryInput, e});
|
||||||
|
attrs = new BasicAttributes(queryType, "NXDOMAIN",true);
|
||||||
|
return attrs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This was separated from main code to ease the creation of test units injecting fake JNDI data
|
||||||
|
// back into the processor.
|
||||||
|
protected void initializeContext(Hashtable<String,String> env) throws NamingException {
|
||||||
|
this.ictx = new InitialDirContext(env);
|
||||||
|
this.initialized = new AtomicBoolean(false);
|
||||||
|
initialized.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -14,3 +14,4 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
org.apache.nifi.processors.GeoEnrichIP
|
org.apache.nifi.processors.GeoEnrichIP
|
||||||
|
org.apache.nifi.processors.enrich.QueryDNS
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.enrich;
|
||||||
|
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import javax.naming.Context;
|
||||||
|
import javax.naming.NamingException;
|
||||||
|
import javax.naming.directory.DirContext;
|
||||||
|
import javax.naming.spi.InitialContextFactory;
|
||||||
|
import java.util.Hashtable;
|
||||||
|
|
||||||
|
|
||||||
|
public class FakeDNSInitialDirContextFactory implements InitialContextFactory {
|
||||||
|
private static DirContext mockContext = null;
|
||||||
|
|
||||||
|
public static DirContext getLatestMockContext() {
|
||||||
|
if (mockContext == null) {
|
||||||
|
return CreateMockContext();
|
||||||
|
} else {
|
||||||
|
return mockContext;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Context getInitialContext(Hashtable environment) throws NamingException {
|
||||||
|
return CreateMockContext();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static DirContext CreateMockContext() {
|
||||||
|
synchronized (FakeDNSInitialDirContextFactory.class) {
|
||||||
|
mockContext = (DirContext) Mockito.mock(DirContext.class);
|
||||||
|
}
|
||||||
|
return mockContext;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,227 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.enrich;
|
||||||
|
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Hashtable;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
|
||||||
|
import javax.naming.Context;
|
||||||
|
import javax.naming.directory.Attributes;
|
||||||
|
import javax.naming.directory.BasicAttribute;
|
||||||
|
import javax.naming.directory.BasicAttributes;
|
||||||
|
import javax.naming.directory.DirContext;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class TestQueryDNS {
|
||||||
|
private QueryDNS queryDNS;
|
||||||
|
private TestRunner queryDNSTestRunner;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setupTest() throws Exception {
|
||||||
|
this.queryDNS = new QueryDNS();
|
||||||
|
this.queryDNSTestRunner = TestRunners.newTestRunner(queryDNS);
|
||||||
|
|
||||||
|
Hashtable env = new Hashtable<String, String>();
|
||||||
|
env.put(Context.INITIAL_CONTEXT_FACTORY, FakeDNSInitialDirContextFactory.class.getName());
|
||||||
|
|
||||||
|
this.queryDNS.initializeContext(env);
|
||||||
|
|
||||||
|
final DirContext mockContext = FakeDNSInitialDirContextFactory.getLatestMockContext();
|
||||||
|
|
||||||
|
// Capture JNDI's getAttibutes method containing the (String) queryValue and (String[]) queryType
|
||||||
|
Mockito.when( mockContext.getAttributes(Mockito.anyString(), Mockito.any(String[].class)))
|
||||||
|
.thenAnswer(new Answer() {
|
||||||
|
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
// Craft a false DNS response
|
||||||
|
// Note the DNS response will not make use of any of the mocked
|
||||||
|
// query contents (all input is discarded and replies synthetically
|
||||||
|
// generated
|
||||||
|
return craftResponse(invocation);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVanillaQueryWithoutSplit() {
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "PTR");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" +
|
||||||
|
".${ip_address:getDelimitedField(3, '.'):trim()}" +
|
||||||
|
".${ip_address:getDelimitedField(2, '.'):trim()}" +
|
||||||
|
".${ip_address:getDelimitedField(1, '.'):trim()}" +
|
||||||
|
".in-addr.arpa");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.NONE.getValue());
|
||||||
|
|
||||||
|
final Map<String, String> attributeMap = new HashMap<>();
|
||||||
|
attributeMap.put("ip_address", "123.123.123.123");
|
||||||
|
|
||||||
|
queryDNSTestRunner.enqueue(new byte[0], attributeMap);
|
||||||
|
queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes());
|
||||||
|
|
||||||
|
queryDNSTestRunner.run(1,true, false);
|
||||||
|
|
||||||
|
List<MockFlowFile> results = queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_FOUND);
|
||||||
|
assertTrue(results.size() == 1);
|
||||||
|
String result = results.get(0).getAttribute("enrich.dns.record0.group0");
|
||||||
|
|
||||||
|
assertTrue(result.contains("apache.nifi.org"));
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidDataWithSplit() {
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "TXT");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" +
|
||||||
|
".${ip_address:getDelimitedField(3, '.'):trim()}" +
|
||||||
|
".${ip_address:getDelimitedField(2, '.'):trim()}" +
|
||||||
|
".${ip_address:getDelimitedField(1, '.'):trim()}" +
|
||||||
|
".origin.asn.nifi.apache.org");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.SPLIT.getValue());
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, "\\|");
|
||||||
|
|
||||||
|
final Map<String, String> attributeMap = new HashMap<>();
|
||||||
|
attributeMap.put("ip_address", "123.123.123.123");
|
||||||
|
|
||||||
|
queryDNSTestRunner.enqueue(new byte[0], attributeMap);
|
||||||
|
queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes());
|
||||||
|
queryDNSTestRunner.run(1,true, false);
|
||||||
|
|
||||||
|
List<MockFlowFile> results = queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_FOUND);
|
||||||
|
assertTrue(results.size() == 1);
|
||||||
|
|
||||||
|
results.get(0).assertAttributeEquals("enrich.dns.record0.group5", " Apache NiFi");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidDataWithRegex() {
|
||||||
|
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "TXT");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" +
|
||||||
|
".${ip_address:getDelimitedField(3, '.'):trim()}" +
|
||||||
|
".${ip_address:getDelimitedField(2, '.'):trim()}" +
|
||||||
|
".${ip_address:getDelimitedField(1, '.'):trim()}" +
|
||||||
|
".origin.asn.nifi.apache.org");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.REGEX.getValue());
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, "\\.*(\\sApache\\sNiFi)$");
|
||||||
|
|
||||||
|
final Map<String, String> attributeMap = new HashMap<>();
|
||||||
|
attributeMap.put("ip_address", "123.123.123.123");
|
||||||
|
|
||||||
|
queryDNSTestRunner.enqueue(new byte[0], attributeMap);
|
||||||
|
queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes());
|
||||||
|
queryDNSTestRunner.run(1, true, false);
|
||||||
|
|
||||||
|
List<MockFlowFile> results = queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_FOUND);
|
||||||
|
assertTrue(results.size() == 1);
|
||||||
|
|
||||||
|
results.get(0).assertAttributeEquals("enrich.dns.record0.group0", " Apache NiFi");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidData() {
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "AAAA");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, "nifi.apache.org");
|
||||||
|
|
||||||
|
|
||||||
|
final Map<String, String> attributeMap = new HashMap<>();
|
||||||
|
attributeMap.put("ip_address", "123.123.123.123");
|
||||||
|
|
||||||
|
queryDNSTestRunner.enqueue(new byte[0], attributeMap);
|
||||||
|
queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes());
|
||||||
|
queryDNSTestRunner.run(1, true, false);
|
||||||
|
|
||||||
|
List<MockFlowFile> results = queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_NOT_FOUND);
|
||||||
|
assertTrue(results.size() == 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomValidator() {
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "AAAA");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000");
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, "nifi.apache.org");
|
||||||
|
// Note the absence of a QUERY_PARSER_INPUT value
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.REGEX.getValue());
|
||||||
|
queryDNSTestRunner.assertNotValid();
|
||||||
|
|
||||||
|
// Note the presence of a QUERY_PARSER_INPUT value
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.REGEX.getValue());
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, "\\|");
|
||||||
|
queryDNSTestRunner.assertValid();
|
||||||
|
|
||||||
|
// Note the presence of a QUERY_PARSER_INPUT value while NONE is set
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.NONE.getValue());
|
||||||
|
queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, "\\|");
|
||||||
|
queryDNSTestRunner.assertNotValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// Dummy pseudo-DNS responder
|
||||||
|
private Attributes craftResponse(InvocationOnMock invocation) {
|
||||||
|
Object[] arguments = invocation.getArguments();
|
||||||
|
String[] queryType = (String[]) arguments[1];
|
||||||
|
|
||||||
|
// Create attribute
|
||||||
|
Attributes attrs = new BasicAttributes(true);
|
||||||
|
BasicAttribute attr;
|
||||||
|
|
||||||
|
switch (queryType[0]) {
|
||||||
|
case "AAAA":
|
||||||
|
attr = new BasicAttribute("AAAA");
|
||||||
|
attrs.put(attr);
|
||||||
|
break;
|
||||||
|
case "TXT":
|
||||||
|
attr = new BasicAttribute("TXT", "666 | 123.123.123.123/32 | Apache-NIFI | AU | nifi.org | Apache NiFi");
|
||||||
|
attrs.put(attr);
|
||||||
|
break;
|
||||||
|
case "PTR":
|
||||||
|
attr = new BasicAttribute("PTR");
|
||||||
|
attr.add(0, "eg-apache.nifi.org.");
|
||||||
|
attr.add(1, "apache.nifi.org.");
|
||||||
|
attrs.put(attr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return attrs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue