NIFI-4116: Allow fields of Record returned from Lookup Service to be placed into record in the input, instead of requiring that the 'wrapper record' returned from Lookup be included

Signed-off-by: James Wing <jvwing@gmail.com>

This closes #2110.
This commit is contained in:
Mark Payne 2017-08-25 16:22:43 -04:00 committed by James Wing
parent 20a6374bf7
commit bfd6c0aef7
2 changed files with 224 additions and 5 deletions

View File

@ -95,6 +95,11 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
"Records will be routed to either a 'matched' or an 'unmatched' Relationship depending on whether or not there was a match in the configured Lookup Service. "
+ "A single input FlowFile may result in two different output FlowFiles.");
static final AllowableValue RESULT_ENTIRE_RECORD = new AllowableValue("insert-entire-record", "Insert Entire Record",
"The entire Record that is retrieved from the Lookup Service will be inserted into the destination path.");
static final AllowableValue RESULT_RECORD_FIELDS = new AllowableValue("record-fields", "Insert Record Fields",
"All of the fields in the Record that is retrieved from the Lookup Service will be inserted into the destination path.");
static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
.name("lookup-service")
.displayName("Lookup Service")
@ -114,6 +119,16 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
.required(false)
.build();
static final PropertyDescriptor RESULT_CONTENTS = new PropertyDescriptor.Builder()
.name("result-contents")
.displayName("Record Result Contents")
.description("When a result is obtained that contains a Record, this property determines whether the Record itself is inserted at the configured "
+ "path or if the contents of the Record (i.e., the sub-fields) will be inserted at the configured path.")
.allowableValues(RESULT_ENTIRE_RECORD, RESULT_RECORD_FIELDS)
.defaultValue(RESULT_ENTIRE_RECORD.getValue())
.required(true)
.build();
static final PropertyDescriptor ROUTING_STRATEGY = new PropertyDescriptor.Builder()
.name("routing-strategy")
.displayName("Routing Strategy")
@ -161,6 +176,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
properties.add(LOOKUP_SERVICE);
properties.add(RESULT_RECORD_PATH);
properties.add(ROUTING_STRATEGY);
properties.add(RESULT_CONTENTS);
return properties;
}
@ -272,14 +288,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
lookupCoordinates.put(coordinateKey, coordinateValue);
}
final Optional<?> lookupValue;
final Optional<?> lookupValueOption;
try {
lookupValue = lookupService.lookup(lookupCoordinates);
lookupValueOption = lookupService.lookup(lookupCoordinates);
} catch (final Exception e) {
throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
}
if (!lookupValue.isPresent()) {
if (!lookupValueOption.isPresent()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
return rels;
}
@ -289,9 +305,39 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
if (resultPath != null) {
record.incorporateSchema(writeSchema);
final Object replacementValue = lookupValue.get();
final Object lookupValue = lookupValueOption.get();
final RecordPathResult resultPathResult = flowFileContext.getValue().evaluate(record);
resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue));
final String resultContentsValue = context.getProperty(RESULT_CONTENTS).getValue();
if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) {
final Record lookupRecord = (Record) lookupValue;
// Use wants to add all fields of the resultant Record to the specified Record Path.
// If the destination Record Path returns to us a Record, then we will add all field values of
// the Lookup Record to the destination Record. However, if the destination Record Path returns
// something other than a Record, then we can't add the fields to it. We can only replace it,
// because it doesn't make sense to add fields to anything but a Record.
resultPathResult.getSelectedFields().forEach(fieldVal -> {
final Object destinationValue = fieldVal.getValue();
if (destinationValue instanceof Record) {
final Record destinationRecord = (Record) destinationValue;
for (final String fieldName : lookupRecord.getRawFieldNames()) {
final Object value = lookupRecord.getValue(fieldName);
destinationRecord.setValue(fieldName, value);
}
} else {
final Optional<Record> parentOption = fieldVal.getParentRecord();
if (parentOption.isPresent()) {
parentOption.get().setValue(fieldVal.getField().getFieldName(), lookupRecord);
}
}
});
} else {
resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue));
}
}
final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;

View File

@ -17,19 +17,30 @@
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.lookup.StringLookupService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -227,6 +238,137 @@ public class TestLookupRecord {
}
@Test
public void testAddFieldsToExistingRecord() throws InitializationException, IOException {
final RecordLookup lookupService = new RecordLookup();
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Record sports = new MapRecord(schema, new HashMap<String, Object>());
sports.setValue("favorite", "basketball");
sports.setValue("least", "soccer");
lookupService.addValue("John Doe", sports);
recordReader = new MockRecordParser();
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("favorite", RecordFieldType.STRING);
recordReader.addSchemaField("least", RecordFieldType.STRING);
recordReader.addRecord("John Doe", 48, null, "baseball");
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty("lookup", "/name");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/");
runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
runner.enqueue("");
runner.run();
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
out.assertContentEquals("John Doe,48,basketball,soccer\n");
}
/**
* If the output fields are added to a record that doesn't exist, the result should be that a Record is
* created and the results added to it.
*/
@Test
public void testAddFieldsToNonExistentRecord() throws InitializationException {
final RecordLookup lookupService = new RecordLookup();
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Record sports = new MapRecord(schema, new HashMap<String, Object>());
sports.setValue("favorite", "basketball");
sports.setValue("least", "soccer");
lookupService.addValue("John Doe", sports);
recordReader = new MockRecordParser();
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("sport", RecordFieldType.RECORD);
recordReader.addRecord("John Doe", 48, null);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty("lookup", "/name");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
runner.enqueue("");
runner.run();
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
// We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first
final String outputContents = new String(out.toByteArray());
assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n")
|| outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
}
/**
* If the output fields are added to a non-record field, then the result should be that the field
* becomes a UNION that does allow the Record and the value is set to a Record.
*/
@Test
public void testAddFieldsToNonRecordField() throws InitializationException {
final RecordLookup lookupService = new RecordLookup();
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Record sports = new MapRecord(schema, new HashMap<String, Object>());
sports.setValue("favorite", "basketball");
sports.setValue("least", "soccer");
lookupService.addValue("John Doe", sports);
recordReader = new MockRecordParser();
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("sport", RecordFieldType.STRING);
recordReader.addRecord("John Doe", 48, null);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty("lookup", "/name");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
runner.enqueue("");
runner.run();
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
// We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first
final String outputContents = new String(out.toByteArray());
assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n")
|| outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
}
private static class MapLookup extends AbstractControllerService implements StringLookupService {
private final Map<String, String> values = new HashMap<>();
@ -260,4 +402,35 @@ public class TestLookupRecord {
}
}
private static class RecordLookup extends AbstractControllerService implements RecordLookupService {
private final Map<String, Record> values = new HashMap<>();
public void addValue(final String key, final Record value) {
values.put(key, value);
}
@Override
public Class<?> getValueType() {
return String.class;
}
@Override
public Optional<Record> lookup(final Map<String, String> coordinates) {
if (coordinates == null) {
return Optional.empty();
}
final String key = coordinates.get("lookup");
if (key == null) {
return Optional.empty();
}
return Optional.ofNullable(values.get(key));
}
@Override
public Set<String> getRequiredKeys() {
return Collections.singleton("lookup");
}
}
}