NIFI-5287 Made LookupRecord able to take in flowfile attributes and combine them with lookup keys.

NIFI-5287 Removed unneeded property descriptor.

NIFI-5287 Added additional changes from a code review.

This closes #2777.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Mike Thomsen 2018-06-09 13:43:24 -04:00 committed by Koji Kawamura
parent dbf259508c
commit 0831059d2c
5 changed files with 127 additions and 14 deletions

View File

@ -222,7 +222,8 @@ public class LookupAttribute extends AbstractProcessor {
final PropertyValue lookupKeyExpression = e.getValue();
final String lookupKey = lookupKeyExpression.evaluateAttributeExpressions(flowFile).getValue();
final String attributeName = e.getKey().getName();
final Optional<String> attributeValue = lookupService.lookup(Collections.singletonMap(coordinateKey, lookupKey));
final Optional<String> attributeValue = lookupService.lookup(Collections.singletonMap(coordinateKey, lookupKey),
flowFile.getAttributes());
matched = putAttribute(attributeName, attributeValue, attributes, includeEmptyValues, logger) || matched;
if (!matched && logger.isDebugEnabled()) {

View File

@ -17,18 +17,6 @@
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -61,6 +49,18 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.Tuple;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@EventDriven
@SideEffectFree
@ -292,7 +292,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
final Optional<?> lookupValueOption;
try {
lookupValueOption = lookupService.lookup(lookupCoordinates);
lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
} catch (final Exception e) {
throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
}

View File

@ -24,6 +24,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.lookup.SimpleKeyValueLookupService;
import org.apache.nifi.lookup.StringLookupService;
import org.apache.nifi.reporting.InitializationException;
@ -31,6 +33,7 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
@ -140,6 +143,27 @@ public class TestLookupAttribute {
runner.assertNotValid();
}
@Test
public void testLookupServicePassFlowfileAttributes() throws InitializationException {
final LookupService service = new TestService();
final TestRunner runner = TestRunners.newTestRunner(new LookupAttribute());
runner.addControllerService("simple-key-value-lookup-service", service);
runner.enableControllerService(service);
runner.assertValid(service);
runner.setProperty(LookupAttribute.LOOKUP_SERVICE, "simple-key-value-lookup-service");
runner.setProperty(LookupAttribute.INCLUDE_EMPTY_VALUES, "false");
runner.setProperty("baz", "${attr1}");
runner.assertValid();
final Map<String, String> attributes = new HashMap<>();
attributes.put("user_defined", "key4");
runner.enqueue("some content".getBytes(), attributes);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(LookupAttribute.REL_MATCHED, 1);
}
private static class InvalidLookupService extends AbstractControllerService implements StringLookupService {
@Override
public Optional<String> lookup(Map<String, Object> coordinates) {
@ -155,4 +179,32 @@ public class TestLookupAttribute {
}
}
static class TestService extends AbstractControllerService implements StringLookupService {
@Override
public Optional<String> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
Assert.assertNotNull(coordinates);
Assert.assertNotNull(context);
Assert.assertEquals(1, coordinates.size());
Assert.assertTrue(context.containsKey("user_defined"));
return Optional.of("Test!");
}
@Override
public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException {
return Optional.empty();
}
@Override
public Class<?> getValueType() {
return String.class;
}
@Override
public Set<String> getRequiredKeys() {
Set set = new HashSet();
set.add("key");
return set;
}
}
}

View File

@ -44,6 +44,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -84,6 +85,32 @@ public class TestLookupRecord {
recordReader.addRecord("Jimmy Doe", 14, null);
}
@Test
public void testFlowfileAttributesPassed() {
Map<String, String> attrs = new HashMap<>();
attrs.put("schema.name", "person");
attrs.put("something_something", "test");
Map<String, Object> expected = new HashMap<>();
expected.putAll(attrs);
lookupService.setExpectedContext(expected);
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jane Doe", "Basketball");
lookupService.addValue("Jimmy Doe", "Football");
runner.enqueue("", attrs);
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,Soccer\nJane Doe,47,Basketball\nJimmy Doe,14,Football\n");
}
@Test
public void testAllMatch() throws InitializationException {
lookupService.addValue("John Doe", "Soccer");
@ -372,6 +399,7 @@ public class TestLookupRecord {
private static class MapLookup extends AbstractControllerService implements StringLookupService {
private final Map<String, String> values = new HashMap<>();
private Map<String, Object> expectedContext;
public void addValue(final String key, final String value) {
values.put(key, value);
@ -382,6 +410,11 @@ public class TestLookupRecord {
return String.class;
}
public Optional<String> lookup(final Map<String, Object> coordinates, Map<String, String> context) {
validateContext(context);
return lookup(coordinates);
}
@Override
public Optional<String> lookup(final Map<String, Object> coordinates) {
if (coordinates == null || coordinates.get("lookup") == null) {
@ -400,6 +433,20 @@ public class TestLookupRecord {
public Set<String> getRequiredKeys() {
return Collections.singleton("lookup");
}
public void setExpectedContext(Map<String, Object> expectedContext) {
this.expectedContext = expectedContext;
}
private void validateContext(Map<String, String> context) {
if (expectedContext != null) {
for (Map.Entry<String, Object> entry : expectedContext.entrySet()) {
Assert.assertTrue(String.format("%s was not in coordinates.", entry.getKey()),
context.containsKey(entry.getKey()));
Assert.assertEquals("Wrong value", entry.getValue(), context.get(entry.getKey()));
}
}
}
}
private static class RecordLookup extends AbstractControllerService implements RecordLookupService {

View File

@ -35,6 +35,19 @@ public interface LookupService<T> extends ControllerService {
*/
Optional<T> lookup(Map<String, Object> coordinates) throws LookupFailureException;
/**
* Looks up a value that corresponds to the given map, coordinates. Additional contextual information will also be passed into the
* map labeled context from sources such as flowfile attributes.
*
* @param coordinates a Map of key/value pairs that indicate the information that should be looked up
* @param context a Map of additional information
* @return a value that corresponds to the given coordinates
* @throws LookupFailureException if unable to lookup a value for the given coordinates
*/
default Optional<T> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
return lookup(coordinates);
}
/**
* @return the Class that represents the type of value that will be returned by {@link #lookup(Map)}
*/