From d6a2409d71f52b691d4202792419dfea48a422aa Mon Sep 17 00:00:00 2001 From: Andre F de Miranda Date: Sun, 5 Jun 2016 01:41:16 +1000 Subject: [PATCH] NIFI-1965 - Implement QueryDNS processor This closes #496. --- .../nifi-enrich-processors/pom.xml | 13 + .../enrich/AbstractEnrichProcessor.java | 161 ++++++++++ .../nifi/processors/enrich/QueryDNS.java | 276 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 3 +- .../FakeDNSInitialDirContextFactory.java | 52 ++++ .../nifi/processors/enrich/TestQueryDNS.java | 227 ++++++++++++++ 6 files changed, 731 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java create mode 100644 nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java create mode 100644 nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/FakeDNSInitialDirContextFactory.java create mode 100644 nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml index d91914bcd9..7059a2fc17 100644 --- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml @@ -20,6 +20,8 @@ nifi-enrich-bundle 1.0.0-SNAPSHOT + + nifi-enrich-processors @@ -50,5 +52,16 @@ findbugs-annotations 1.3.9-1 + + junit + junit + 4.11 + test + + + org.apache.nifi + nifi-mock + test + diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java new file mode 100644 index 0000000000..c576ca24d3 --- /dev/null +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.enrich; + + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public abstract class AbstractEnrichProcessor extends AbstractProcessor { + public static final PropertyDescriptor QUERY_INPUT = new PropertyDescriptor.Builder() + .name("QUERY_INPUT") + .displayName("Lookup value") + .required(true) + .description("The value that should be used to populate the query") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final AllowableValue SPLIT= new AllowableValue("Split", "Split", + "Use a delimiter character or RegEx to split the results into attributes"); + public static final AllowableValue REGEX = new AllowableValue("RegEx", "RegEx", + "Use a regular expression to split the results into attributes "); + public static final AllowableValue NONE = new AllowableValue("None", "None", + "Do not split results"); + + public static final PropertyDescriptor QUERY_PARSER = new PropertyDescriptor.Builder() + .name("QUERY_PARSER") + .displayName("Results Parser") + .description("The method used to slice the results into attribute groups") + .allowableValues(SPLIT, REGEX, NONE) + .required(true) + .defaultValue(NONE.getValue()) + .build(); + + public static final PropertyDescriptor QUERY_PARSER_INPUT = new PropertyDescriptor.Builder() + .name("QUERY_PARSER_INPUT") + .displayName("Parser RegEx") + .description("Choice between a splitter and regex matcher used to parse the results of the query into attribute groups") + .expressionLanguageSupported(false) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + + public static final Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("Where to route flow files after successfully enriching attributes with data") + .build(); + + public static final Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("Where to route flow files if data enrichment query rendered no results") + .build(); + + + @Override + public List customValidate(ValidationContext validationContext) { + final List results = new ArrayList<>(super.customValidate(validationContext)); + + final String chosenQUERY_PARSER = validationContext.getProperty(QUERY_PARSER).getValue(); + final boolean QUERY_PARSER_INPUT_isSet = validationContext.getProperty(QUERY_PARSER_INPUT).isSet(); + + if ((!chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( !QUERY_PARSER_INPUT_isSet )) { + results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT") + .explanation("Split and Regex parsers require a valid Regular Expression") + .valid(false) + .build()); + } + + if ((chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( QUERY_PARSER_INPUT_isSet )) { + results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT") + .explanation("NONE parser does not support the use of Regular Expressions") + .valid(false) + .build()); + } + + return results; + } + + + + /** + * This method returns the parsed record string in the form of + * a map of two strings, consisting of a iteration aware attribute + * names and its values + * + * @param recordPosition the iteration counter for the record + * @param rawResult the raw query results to be parsed + * @param queryParser The parsing mechanism being used to parse the data into groups + * @param queryRegex The regex to be used to split the query results into groups + * @return Map with attribute names and values + */ + protected Map parseResponse(int recordPosition, String rawResult, String queryParser, String queryRegex, String schema) { + + Map results = new HashMap<>(); + Pattern p; + + + // Iterates over the results using the QUERY_REGEX adding the captured groups + // as it progresses + + switch (queryParser) { + case "Split": + // Time to Split the results... + String[] splitResult = rawResult.split(queryRegex); + for (int r = 0; r < splitResult.length; r++) { + results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group" + String.valueOf(r), splitResult[r]); + } + break; + + case "RegEx": + // RegEx was chosen, iterating... + p = Pattern.compile(queryRegex); + Matcher finalResult = p.matcher(rawResult); + if (finalResult.find()) { + // Note that RegEx matches capture group 0 is usually broad but starting with it anyway + // for the sake of purity + for (int r = 0; r < finalResult.groupCount(); r++) { + results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group" + String.valueOf(r), finalResult.group(r)); + } + } + break; + + case "None": + // Fails to NONE + default: + // NONE was chosen, just appending the record result as group0 without further splitting + results.put("enrich." + schema + ".record" + String.valueOf(recordPosition) + ".group0", rawResult); + break; + } + return results; + } + +} diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java new file mode 100644 index 0000000000..d80a72cba8 --- /dev/null +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.enrich; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.naming.Context; +import javax.naming.NameNotFoundException; +import javax.naming.NamingEnumeration; +import javax.naming.NamingException; +import javax.naming.directory.Attributes; +import javax.naming.directory.BasicAttributes; +import javax.naming.directory.DirContext; +import javax.naming.directory.InitialDirContext; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"dns", "enrich", "ip"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("A powerful DNS query processor primary designed to enrich DataFlows with DNS based APIs " + + "(e.g. RBLs, ShadowServer's ASN lookup) but that can be also used to perform regular DNS lookups.") +@WritesAttributes({ + @WritesAttribute(attribute = "enrich.dns.record*.group*", description = "The captured fields of the DNS query response for each of the records received"), +}) +public class QueryDNS extends AbstractEnrichProcessor { + + public static final PropertyDescriptor DNS_QUERY_TYPE = new PropertyDescriptor.Builder() + .name("DNS_QUERY_TYPE") + .displayName("DNS Query Type") + .description("The DNS query type to be used by the processor (e.g. TXT, A)") + .required(true) + .defaultValue("TXT") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DNS_SERVER = new PropertyDescriptor.Builder() + .name("DNS_SERVER") + .displayName("DNS Servers") + .description("A comma separated list of DNS servers to be used. (Defaults to system wide if none is used)") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DNS_TIMEOUT = new PropertyDescriptor.Builder() + .name("DNS_TIMEOUT") + .displayName("DNS Query Timeout") + .description("The amount of milliseconds to wait until considering a query as failed") + .required(true) + .defaultValue("1500") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DNS_RETRIES = new PropertyDescriptor.Builder() + .name("DNS_RETRIES") + .displayName("DNS Query Retries") + .description("The number of attempts before giving up and moving on") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + + private final static List propertyDescriptors; + private final static Set relationships; + + private DirContext ictx; + + // Assign the default and generally used contextFactory value + private String contextFactory = com.sun.jndi.dns.DnsContextFactory.class.getName();; + + static { + List props = new ArrayList<>(); + props.add(QUERY_INPUT); + props.add(QUERY_PARSER); + props.add(QUERY_PARSER_INPUT); + props.add(DNS_RETRIES); + props.add(DNS_TIMEOUT); + props.add(DNS_SERVER); + props.add(DNS_QUERY_TYPE); + propertyDescriptors = Collections.unmodifiableList(props); + + Set rels = new HashSet<>(); + rels.add(REL_FOUND); + rels.add(REL_NOT_FOUND); + relationships = Collections.unmodifiableSet(rels); + } + private AtomicBoolean initialized = new AtomicBoolean(false); + + + @Override + protected List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + + @Override + public Set getRelationships() { + return relationships; + } + + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + if (!initialized.get()) { + initializeResolver(context); + getLogger().warn("Resolver was initialized at onTrigger instead of onScheduled"); + + } + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String queryType = context.getProperty(DNS_QUERY_TYPE).getValue(); + final String queryInput = context.getProperty(QUERY_INPUT).evaluateAttributeExpressions(flowFile).getValue(); + final String queryParser = context.getProperty(QUERY_PARSER).getValue(); + final String queryRegex = context.getProperty(QUERY_PARSER_INPUT).getValue(); + + boolean found = false; + try { + Attributes results = doLookup(queryInput, queryType); + // NOERROR & NODATA seem to return empty Attributes handled bellow + // but defaulting to not found in any case + if (results.size() < 1) { + found = false; + } else { + int recordNumber = 0; + NamingEnumeration dnsEntryIterator = results.get(queryType).getAll(); + + while (dnsEntryIterator.hasMoreElements()) { + String dnsRecord = dnsEntryIterator.next().toString(); + // While NXDOMAIN is being generated by doLookup catch + + if (dnsRecord != "NXDOMAIN") { + Map parsedResults = parseResponse(recordNumber, dnsRecord, queryParser, queryRegex, "dns"); + flowFile = session.putAllAttributes(flowFile, parsedResults); + found = true; + } else { + // Otherwise treat as not found + found = false; + } + + // Increase the counter and iterate over next record.... + recordNumber++; + } + } + } catch (NamingException e) { + context.yield(); + throw new ProcessException("Unexpected NamingException while processing records. Please review your configuration.", e); + + } + + // Finally prepare to send the data down the pipeline + if (found) { + // Sending the resulting flowfile (with attributes) to REL_FOUND + session.transfer(flowFile, REL_FOUND); + } else { + // NXDOMAIN received, accepting the fate but forwarding + // to REL_NOT_FOUND + session.transfer(flowFile, REL_NOT_FOUND); + } + } + + + @OnScheduled + public void onScheduled(ProcessContext context) { + try { + initializeResolver(context); + } catch (Exception e) { + context.yield(); + throw new ProcessException("Failed to initialize the JNDI DNS resolver server", e); + } + } + + + protected void initializeResolver(final ProcessContext context ) { + + final String dnsTimeout = context.getProperty(DNS_TIMEOUT).getValue(); + final String dnsServer = context.getProperty(DNS_SERVER).getValue(); + final String dnsRetries = context.getProperty(DNS_RETRIES).getValue(); + + + String finalServer = ""; + Hashtable env = new Hashtable(); + env.put("java.naming.factory.initial", contextFactory); + env.put("com.sun.jndi.dns.timeout.initial", dnsTimeout); + env.put("com.sun.jndi.dns.timeout.retries", dnsRetries); + if (StringUtils.isNotEmpty(dnsServer)) { + for (String server : dnsServer.split(",")) { + finalServer = finalServer + "dns://" + server + "/. "; + } + env.put(Context.PROVIDER_URL, finalServer); + } + + try { + initializeContext(env); + initialized.set(true); + } catch (NamingException e) { + getLogger().error("Could not initialize JNDI context", e); + } + } + + + /** + * This method performs a simple DNS lookup using JNDI + * @param queryInput String containing the query body itself (e.g. 4.3.3.1.in-addr.arpa); + * @param queryType String containign the query type (e.g. TXT); + */ + protected Attributes doLookup(String queryInput, String queryType) throws NamingException { + // This is a simple DNS lookup attempt + + Attributes attrs; + + try { + // Uses pre-existing context to resolve + attrs = ictx.getAttributes(queryInput, new String[]{queryType}); + return attrs; + } catch ( NameNotFoundException e) { + getLogger().debug("Resolution for domain {} failed due to {}", new Object[]{queryInput, e}); + attrs = new BasicAttributes(queryType, "NXDOMAIN",true); + return attrs; + } + } + + // This was separated from main code to ease the creation of test units injecting fake JNDI data + // back into the processor. + protected void initializeContext(Hashtable env) throws NamingException { + this.ictx = new InitialDirContext(env); + this.initialized = new AtomicBoolean(false); + initialized.set(true); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 9b1be71888..e4a39f5595 100644 --- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.GeoEnrichIP \ No newline at end of file +org.apache.nifi.processors.GeoEnrichIP +org.apache.nifi.processors.enrich.QueryDNS diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/FakeDNSInitialDirContextFactory.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/FakeDNSInitialDirContextFactory.java new file mode 100644 index 0000000000..a00dab6e91 --- /dev/null +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/FakeDNSInitialDirContextFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.enrich; + +import org.mockito.Mockito; + +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.directory.DirContext; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; + + +public class FakeDNSInitialDirContextFactory implements InitialContextFactory { + private static DirContext mockContext = null; + + public static DirContext getLatestMockContext() { + if (mockContext == null) { + return CreateMockContext(); + } else { + return mockContext; + } + } + + @Override + public Context getInitialContext(Hashtable environment) throws NamingException { + return CreateMockContext(); + } + + private static DirContext CreateMockContext() { + synchronized (FakeDNSInitialDirContextFactory.class) { + mockContext = (DirContext) Mockito.mock(DirContext.class); + } + return mockContext; + } +} + diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java new file mode 100644 index 0000000000..65c1a503df --- /dev/null +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.enrich; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + + +import javax.naming.Context; +import javax.naming.directory.Attributes; +import javax.naming.directory.BasicAttribute; +import javax.naming.directory.BasicAttributes; +import javax.naming.directory.DirContext; + +import static org.junit.Assert.assertTrue; + +public class TestQueryDNS { + private QueryDNS queryDNS; + private TestRunner queryDNSTestRunner; + + @Before + public void setupTest() throws Exception { + this.queryDNS = new QueryDNS(); + this.queryDNSTestRunner = TestRunners.newTestRunner(queryDNS); + + Hashtable env = new Hashtable(); + env.put(Context.INITIAL_CONTEXT_FACTORY, FakeDNSInitialDirContextFactory.class.getName()); + + this.queryDNS.initializeContext(env); + + final DirContext mockContext = FakeDNSInitialDirContextFactory.getLatestMockContext(); + + // Capture JNDI's getAttibutes method containing the (String) queryValue and (String[]) queryType + Mockito.when( mockContext.getAttributes(Mockito.anyString(), Mockito.any(String[].class))) + .thenAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) throws Throwable { + // Craft a false DNS response + // Note the DNS response will not make use of any of the mocked + // query contents (all input is discarded and replies synthetically + // generated + return craftResponse(invocation); + } + }); + } + + @Test + public void testVanillaQueryWithoutSplit() { + queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "PTR"); + queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1"); + queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000"); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" + + ".${ip_address:getDelimitedField(3, '.'):trim()}" + + ".${ip_address:getDelimitedField(2, '.'):trim()}" + + ".${ip_address:getDelimitedField(1, '.'):trim()}" + + ".in-addr.arpa"); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.NONE.getValue()); + + final Map attributeMap = new HashMap<>(); + attributeMap.put("ip_address", "123.123.123.123"); + + queryDNSTestRunner.enqueue(new byte[0], attributeMap); + queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes()); + + queryDNSTestRunner.run(1,true, false); + + List results = queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_FOUND); + assertTrue(results.size() == 1); + String result = results.get(0).getAttribute("enrich.dns.record0.group0"); + + assertTrue(result.contains("apache.nifi.org")); + + + } + + @Test + public void testValidDataWithSplit() { + queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "TXT"); + queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1"); + queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000"); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" + + ".${ip_address:getDelimitedField(3, '.'):trim()}" + + ".${ip_address:getDelimitedField(2, '.'):trim()}" + + ".${ip_address:getDelimitedField(1, '.'):trim()}" + + ".origin.asn.nifi.apache.org"); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.SPLIT.getValue()); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, "\\|"); + + final Map attributeMap = new HashMap<>(); + attributeMap.put("ip_address", "123.123.123.123"); + + queryDNSTestRunner.enqueue(new byte[0], attributeMap); + queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes()); + queryDNSTestRunner.run(1,true, false); + + List results = queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_FOUND); + assertTrue(results.size() == 1); + + results.get(0).assertAttributeEquals("enrich.dns.record0.group5", " Apache NiFi"); + } + + @Test + public void testValidDataWithRegex() { + + queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "TXT"); + queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1"); + queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000"); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, "${ip_address:getDelimitedField(4, '.'):trim()}" + + ".${ip_address:getDelimitedField(3, '.'):trim()}" + + ".${ip_address:getDelimitedField(2, '.'):trim()}" + + ".${ip_address:getDelimitedField(1, '.'):trim()}" + + ".origin.asn.nifi.apache.org"); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.REGEX.getValue()); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, "\\.*(\\sApache\\sNiFi)$"); + + final Map attributeMap = new HashMap<>(); + attributeMap.put("ip_address", "123.123.123.123"); + + queryDNSTestRunner.enqueue(new byte[0], attributeMap); + queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes()); + queryDNSTestRunner.run(1, true, false); + + List results = queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_FOUND); + assertTrue(results.size() == 1); + + results.get(0).assertAttributeEquals("enrich.dns.record0.group0", " Apache NiFi"); + + } + + @Test + public void testInvalidData() { + queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "AAAA"); + queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1"); + queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000"); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, "nifi.apache.org"); + + + final Map attributeMap = new HashMap<>(); + attributeMap.put("ip_address", "123.123.123.123"); + + queryDNSTestRunner.enqueue(new byte[0], attributeMap); + queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes()); + queryDNSTestRunner.run(1, true, false); + + List results = queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_NOT_FOUND); + assertTrue(results.size() == 1); + } + + @Test + public void testCustomValidator() { + queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "AAAA"); + queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1"); + queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000"); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, "nifi.apache.org"); + // Note the absence of a QUERY_PARSER_INPUT value + queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.REGEX.getValue()); + queryDNSTestRunner.assertNotValid(); + + // Note the presence of a QUERY_PARSER_INPUT value + queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.REGEX.getValue()); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, "\\|"); + queryDNSTestRunner.assertValid(); + + // Note the presence of a QUERY_PARSER_INPUT value while NONE is set + queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, QueryDNS.NONE.getValue()); + queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, "\\|"); + queryDNSTestRunner.assertNotValid(); + } + + + + + // Dummy pseudo-DNS responder + private Attributes craftResponse(InvocationOnMock invocation) { + Object[] arguments = invocation.getArguments(); + String[] queryType = (String[]) arguments[1]; + + // Create attribute + Attributes attrs = new BasicAttributes(true); + BasicAttribute attr; + + switch (queryType[0]) { + case "AAAA": + attr = new BasicAttribute("AAAA"); + attrs.put(attr); + break; + case "TXT": + attr = new BasicAttribute("TXT", "666 | 123.123.123.123/32 | Apache-NIFI | AU | nifi.org | Apache NiFi"); + attrs.put(attr); + break; + case "PTR": + attr = new BasicAttribute("PTR"); + attr.add(0, "eg-apache.nifi.org."); + attr.add(1, "apache.nifi.org."); + attrs.put(attr); + break; + } + return attrs; + } +} +