NIFI-9903: This closes #5955. When using the 'success' relationship only for LookupRecord, lookup all records until a match is found, in order to determine the resultant schema. Refactored code to eliminate AbstractRouteRecord, because LookupRecord is the last processor that extended from it. Refactored code to use an inner interface to clean up code.

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2022-04-11 13:50:06 -04:00 committed by Joe Witt
parent 3034f2637a
commit 05f3d7510f
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
3 changed files with 475 additions and 382 deletions

View File

@ -1,243 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.Tuple;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for reading incoming data")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing out the records")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile cannot be transformed from the configured input format to the configured output format, "
+ "the unchanged FlowFile will be routed to this relationship")
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("Once a FlowFile has been processed and any derivative FlowFiles have been transferred, the original FlowFile will be transferred to this relationship.")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(RECORD_READER);
properties.add(RECORD_WRITER);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
if (isRouteOriginal()) {
relationships.add(REL_ORIGINAL);
}
relationships.add(REL_FAILURE);
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final T flowFileContext;
try {
flowFileContext = getFlowFileContext(flowFile, context);
} catch (final Exception e) {
getLogger().error("Failed to process {}; routing to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final AtomicInteger numRecords = new AtomicInteger(0);
final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
final FlowFile original = flowFile;
final Map<String, String> originalAttributes = original.getAttributes();
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) {
final Record firstRecord = reader.nextRecord();
if (firstRecord == null) {
getLogger().info("{} has no Records, so routing just the original FlowFile to 'original'", new Object[] {original});
return;
}
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.getSchema());
final Set<Relationship> firstRecordRelationships = route(firstRecord, writeSchema, original, context, flowFileContext);
for (final Relationship relationship : firstRecordRelationships) {
writeRecord(firstRecord, relationship, writers, session, original, originalAttributes, writerFactory);
}
Record record;
while ((record = reader.nextRecord()) != null) {
final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext);
numRecords.incrementAndGet();
for (final Relationship relationship : relationships) {
writeRecord(record, relationship, writers, session, original, originalAttributes, writerFactory);
}
}
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Could not parse incoming data", e);
}
}
});
for (final Map.Entry<Relationship, Tuple<FlowFile, RecordSetWriter>> entry : writers.entrySet()) {
final Relationship relationship = entry.getKey();
final Tuple<FlowFile, RecordSetWriter> tuple = entry.getValue();
final RecordSetWriter writer = tuple.getValue();
FlowFile childFlowFile = tuple.getKey();
final WriteResult writeResult = writer.finishRecordSet();
try {
writer.close();
} catch (final IOException ioe) {
getLogger().warn("Failed to close Writer for {}", new Object[] {childFlowFile});
}
final Map<String, String> attributes = new HashMap<>();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
childFlowFile = session.putAllAttributes(childFlowFile, attributes);
session.transfer(childFlowFile, relationship);
session.adjustCounter("Records Processed", writeResult.getRecordCount(), false);
session.adjustCounter("Records Routed to " + relationship.getName(), writeResult.getRecordCount(), false);
session.getProvenanceReporter().route(childFlowFile, relationship);
}
} catch (final Exception e) {
getLogger().error("Failed to process {}", new Object[] {flowFile, e});
for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
try {
tuple.getValue().close();
} catch (final Exception e1) {
getLogger().warn("Failed to close Writer for {}; some resources may not be cleaned up appropriately", new Object[] {tuple.getKey()});
}
session.remove(tuple.getKey());
}
session.transfer(flowFile, REL_FAILURE);
return;
} finally {
for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
final RecordSetWriter writer = tuple.getValue();
try {
writer.close();
} catch (final Exception e) {
getLogger().warn("Failed to close Record Writer for {}; some resources may not be properly cleaned up", new Object[] {tuple.getKey(), e});
}
}
}
if (isRouteOriginal()) {
flowFile = session.putAttribute(flowFile, "record.count", String.valueOf(numRecords));
session.transfer(flowFile, REL_ORIGINAL);
} else {
session.remove(flowFile);
}
getLogger().info("Successfully processed {}, creating {} derivative FlowFiles and processing {} records", new Object[] {flowFile, writers.size(), numRecords});
}
private void writeRecord(final Record record, final Relationship relationship, final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers, final ProcessSession session,
final FlowFile original, final Map<String, String> originalAttributes, final RecordSetWriterFactory writerFactory) throws IOException, SchemaNotFoundException {
final RecordSetWriter recordSetWriter;
Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship);
if (tuple == null) {
final FlowFile outFlowFile = session.create(original);
final OutputStream out = session.write(outFlowFile);
final RecordSchema recordWriteSchema = writerFactory.getSchema(originalAttributes, record.getSchema());
recordSetWriter = writerFactory.createWriter(getLogger(), recordWriteSchema, out, outFlowFile);
recordSetWriter.beginRecordSet();
tuple = new Tuple<>(outFlowFile, recordSetWriter);
writers.put(relationship, tuple);
} else {
recordSetWriter = tuple.getValue();
}
recordSetWriter.write(record);
}
protected abstract Set<Relationship> route(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context, T flowFileContext);
protected abstract boolean isRouteOriginal();
protected abstract T getFlowFileContext(FlowFile flowFile, ProcessContext context);
}

View File

@ -35,15 +35,28 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService; import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath; import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult; import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache; import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator; import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
@ -52,6 +65,9 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.Tuple; import org.apache.nifi.util.Tuple;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -89,9 +105,9 @@ import java.util.stream.Collectors;
description = "A RecordPath that points to the field whose value will be looked up in the configured Lookup Service") description = "A RecordPath that points to the field whose value will be looked up in the configured Lookup Service")
@SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, @SeeAlso(value = {ConvertRecord.class, SplitRecord.class},
classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService", "org.apache.nifi.lookup.db.DatabaseRecordLookupService"}) classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService", "org.apache.nifi.lookup.db.DatabaseRecordLookupService"})
public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPath>, RecordPath>> { public class LookupRecord extends AbstractProcessor {
private volatile RecordPathCache recordPathCache = new RecordPathCache(25); private final RecordPathCache recordPathCache = new RecordPathCache(25);
private volatile LookupService<?> lookupService; private volatile LookupService<?> lookupService;
static final AllowableValue ROUTE_TO_SUCCESS = new AllowableValue("route-to-success", "Route to 'success'", static final AllowableValue ROUTE_TO_SUCCESS = new AllowableValue("route-to-success", "Route to 'success'",
@ -113,6 +129,22 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
+ "Service and the value returned by the Lookup Service will be used to replace the existing value. It is possible to configure multiple dynamic properties " + "Service and the value returned by the Lookup Service will be used to replace the existing value. It is possible to configure multiple dynamic properties "
+ "to replace multiple values in one execution. This strategy only supports simple types replacements (strings, integers, etc)."); + "to replace multiple values in one execution. This strategy only supports simple types replacements (strings, integers, etc).");
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for reading incoming data")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing out the records")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder() static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
.name("lookup-service") .name("lookup-service")
.displayName("Lookup Service") .displayName("Lookup Service")
@ -174,6 +206,10 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
.name("success") .name("success")
.description("All records will be sent to this Relationship if configured to do so, unless a failure occurs") .description("All records will be sent to this Relationship if configured to do so, unless a failure occurs")
.build(); .build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile cannot be enriched, the unchanged FlowFile will be routed to this relationship")
.build();
private static final Set<Relationship> MATCHED_COLLECTION = Collections.singleton(REL_MATCHED); private static final Set<Relationship> MATCHED_COLLECTION = Collections.singleton(REL_MATCHED);
private static final Set<Relationship> UNMATCHED_COLLECTION = Collections.singleton(REL_UNMATCHED); private static final Set<Relationship> UNMATCHED_COLLECTION = Collections.singleton(REL_UNMATCHED);
@ -195,7 +231,8 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.addAll(super.getSupportedPropertyDescriptors()); properties.add(RECORD_READER);
properties.add(RECORD_WRITER);
properties.add(LOOKUP_SERVICE); properties.add(LOOKUP_SERVICE);
properties.add(RESULT_RECORD_PATH); properties.add(RESULT_RECORD_PATH);
properties.add(ROUTING_STRATEGY); properties.add(ROUTING_STRATEGY);
@ -291,177 +328,375 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
} }
} }
@Override
protected Set<Relationship> route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context,
final Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final FlowFile original = flowFile;
final Map<String, String> originalAttributes = original.getAttributes();
final LookupContext lookupContext = createLookupContext(flowFile, context, session, writerFactory);
final ReplacementStrategy replacementStrategy = createReplacementStrategy(context);
final RecordSchema enrichedSchema;
try {
enrichedSchema = replacementStrategy.determineResultSchema(readerFactory, writerFactory, context, session, flowFile, lookupContext);
} catch (final Exception e) {
getLogger().error("Could not determine schema to use for enriched FlowFiles", e);
session.transfer(original, REL_FAILURE);
return;
}
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) {
final Map<Relationship, RecordSchema> writeSchemas = new HashMap<>();
Record record;
while ((record = reader.nextRecord()) != null) {
final Set<Relationship> relationships = replacementStrategy.lookup(record, context, lookupContext);
for (final Relationship relationship : relationships) {
// Determine the Write Schema to use for each relationship
RecordSchema writeSchema = writeSchemas.get(relationship);
if (writeSchema == null) {
final RecordSchema outputSchema = enrichedSchema == null ? record.getSchema() : enrichedSchema;
writeSchema = writerFactory.getSchema(originalAttributes, outputSchema);
writeSchemas.put(relationship, writeSchema);
}
final RecordSetWriter writer = lookupContext.getRecordWriterForRelationship(relationship, writeSchema);
writer.write(record);
}
}
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Could not parse incoming data", e);
}
}
});
for (final Relationship relationship : lookupContext.getRelationshipsUsed()) {
final RecordSetWriter writer = lookupContext.getExistingRecordWriterForRelationship(relationship);
FlowFile childFlowFile = lookupContext.getFlowFileForRelationship(relationship);
final WriteResult writeResult = writer.finishRecordSet();
try {
writer.close();
} catch (final IOException ioe) {
getLogger().warn("Failed to close Writer for {}", new Object[] {childFlowFile});
}
final Map<String, String> attributes = new HashMap<>();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
childFlowFile = session.putAllAttributes(childFlowFile, attributes);
session.transfer(childFlowFile, relationship);
session.adjustCounter("Records Processed", writeResult.getRecordCount(), false);
session.adjustCounter("Records Routed to " + relationship.getName(), writeResult.getRecordCount(), false);
session.getProvenanceReporter().route(childFlowFile, relationship);
}
} catch (final Exception e) {
getLogger().error("Failed to process {}", new Object[]{flowFile, e});
for (final Relationship relationship : lookupContext.getRelationshipsUsed()) {
final RecordSetWriter writer = lookupContext.getExistingRecordWriterForRelationship(relationship);
final FlowFile childFlowFile = lookupContext.getFlowFileForRelationship(relationship);
try {
writer.close();
} catch (final Exception e1) {
getLogger().warn("Failed to close Writer for {}; some resources may not be cleaned up appropriately", writer);
}
session.remove(childFlowFile);
}
session.transfer(flowFile, REL_FAILURE);
return;
} finally {
for (final Relationship relationship : lookupContext.getRelationshipsUsed()) {
final RecordSetWriter writer = lookupContext.getExistingRecordWriterForRelationship(relationship);
try {
writer.close();
} catch (final Exception e) {
getLogger().warn("Failed to close Record Writer for {}; some resources may not be properly cleaned up", writer, e);
}
}
}
session.remove(flowFile);
getLogger().info("Successfully processed {}, creating {} derivative FlowFiles and processing {} records",
flowFile, lookupContext.getRelationshipsUsed().size(), replacementStrategy.getLookupCount());
}
private ReplacementStrategy createReplacementStrategy(final ProcessContext context) {
final boolean isInPlaceReplacement = context.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue()); final boolean isInPlaceReplacement = context.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue());
if(isInPlaceReplacement) { if (isInPlaceReplacement) {
return doInPlaceReplacement(record, flowFile, context, flowFileContext); return new InPlaceReplacementStrategy();
} else { } else {
return doResultPathReplacement(record, flowFile, context, flowFileContext); return new RecordPathReplacementStrategy();
}
}
private Set<Relationship> doInPlaceReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
final String coordinateKey = lookupService.getRequiredKeys().iterator().next();
boolean hasUnmatchedValue = false;
for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
final RecordPath recordPath = entry.getValue();
final RecordPathResult pathResult = recordPath.evaluate(record);
final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
.filter(fieldVal -> fieldVal.getValue() != null)
.collect(Collectors.toList());
if (lookupFieldValues.isEmpty()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels});
return rels;
}
for (FieldValue fieldValue : lookupFieldValues) {
final Object coordinateValue = DataTypeUtils.convertType(fieldValue.getValue(), fieldValue.getField().getDataType(), null, null, null, fieldValue.getField().getFieldName());
lookupCoordinates.clear();
lookupCoordinates.put(coordinateKey, coordinateValue);
final Optional<?> lookupValueOption;
try {
lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
} catch (final Exception e) {
throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
}
if (!lookupValueOption.isPresent()) {
hasUnmatchedValue = true;
continue;
}
final Object lookupValue = lookupValueOption.get();
final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
fieldValue.updateValue(lookupValue, inferredDataType);
}
}
if (hasUnmatchedValue) {
return routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
} else {
return routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
} }
} }
private Set<Relationship> doResultPathReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) { private class InPlaceReplacementStrategy implements ReplacementStrategy {
final String coordinateKey = entry.getKey(); private int lookupCount = 0;
final RecordPath recordPath = entry.getValue();
final RecordPathResult pathResult = recordPath.evaluate(record); @Override
final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields() public Set<Relationship> lookup(final Record record, final ProcessContext context, final LookupContext lookupContext) {
.filter(fieldVal -> fieldVal.getValue() != null) lookupCount++;
.collect(Collectors.toList());
if (lookupFieldValues.isEmpty()) { final Map<String, RecordPath> recordPaths = lookupContext.getRecordPathsByCoordinateKey();
final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
final String coordinateKey = lookupService.getRequiredKeys().iterator().next();
final FlowFile flowFile = lookupContext.getOriginalFlowFile();
boolean hasUnmatchedValue = false;
for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
final RecordPath recordPath = entry.getValue();
final RecordPathResult pathResult = recordPath.evaluate(record);
final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
.filter(fieldVal -> fieldVal.getValue() != null)
.collect(Collectors.toList());
if (lookupFieldValues.isEmpty()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels});
return rels;
}
for (final FieldValue fieldValue : lookupFieldValues) {
final Object coordinateValue = DataTypeUtils.convertType(fieldValue.getValue(), fieldValue.getField().getDataType(), null, null, null, fieldValue.getField().getFieldName());
lookupCoordinates.clear();
lookupCoordinates.put(coordinateKey, coordinateValue);
final Optional<?> lookupValueOption;
try {
lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
} catch (final Exception e) {
throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
}
if (!lookupValueOption.isPresent()) {
hasUnmatchedValue = true;
continue;
}
final Object lookupValue = lookupValueOption.get();
final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
fieldValue.updateValue(lookupValue, inferredDataType);
}
}
if (hasUnmatchedValue) {
return routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
} else {
return routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
}
}
@Override
public RecordSchema determineResultSchema(final RecordReaderFactory readerFactory, final RecordSetWriterFactory writerFactory, final ProcessContext context, final ProcessSession session,
final FlowFile flowFile, final LookupContext lookupContext) throws IOException, SchemaNotFoundException, MalformedRecordException {
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
return reader.getSchema();
}
}
@Override
public int getLookupCount() {
return lookupCount;
}
}
private class RecordPathReplacementStrategy implements ReplacementStrategy {
private int lookupCount = 0;
@Override
public Set<Relationship> lookup(final Record record, final ProcessContext context, final LookupContext lookupContext) {
lookupCount++;
final Map<String, Object> lookupCoordinates = createLookupCoordinates(record, lookupContext, true);
if (lookupCoordinates.isEmpty()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels});
return rels; return rels;
} }
if (lookupFieldValues.size() > 1) { final FlowFile flowFile = lookupContext.getOriginalFlowFile();
final Optional<?> lookupValueOption;
try {
lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
} catch (final Exception e) {
throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
}
if (!lookupValueOption.isPresent()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
getLogger().debug("RecordPath for property '{}' matched {} fields in a record for {}; routing record to {}",
new Object[] {coordinateKey, lookupFieldValues.size(), flowFile, rels});
return rels; return rels;
} }
final FieldValue fieldValue = lookupFieldValues.get(0); applyLookupResult(record, context, lookupContext, lookupValueOption.get());
final Object coordinateValue = DataTypeUtils.convertType(fieldValue.getValue(), fieldValue.getField().getDataType(), null, null, null, fieldValue.getField().getFieldName());
lookupCoordinates.put(coordinateKey, coordinateValue);
}
final Optional<?> lookupValueOption; final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
try {
lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
} catch (final Exception e) {
throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
}
if (!lookupValueOption.isPresent()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
return rels; return rels;
} }
// Ensure that the Record has the appropriate schema to account for the newly added values private void applyLookupResult(final Record record, final ProcessContext context, final LookupContext lookupContext, final Object lookupValue) {
final RecordPath resultPath = flowFileContext.getValue(); // Ensure that the Record has the appropriate schema to account for the newly added values
if (resultPath != null) { final RecordPath resultPath = lookupContext.getResultRecordPath();
final Object lookupValue = lookupValueOption.get(); if (resultPath != null) {
final RecordPathResult resultPathResult = flowFileContext.getValue().evaluate(record); final RecordPathResult resultPathResult = resultPath.evaluate(record);
final String resultContentsValue = context.getProperty(RESULT_CONTENTS).getValue(); final String resultContentsValue = context.getProperty(RESULT_CONTENTS).getValue();
if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) { if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) {
final Record lookupRecord = (Record) lookupValue; final Record lookupRecord = (Record) lookupValue;
// User wants to add all fields of the resultant Record to the specified Record Path. // User wants to add all fields of the resultant Record to the specified Record Path.
// If the destination Record Path returns to us a Record, then we will add all field values of // If the destination Record Path returns to us a Record, then we will add all field values of
// the Lookup Record to the destination Record. However, if the destination Record Path returns // the Lookup Record to the destination Record. However, if the destination Record Path returns
// something other than a Record, then we can't add the fields to it. We can only replace it, // something other than a Record, then we can't add the fields to it. We can only replace it,
// because it doesn't make sense to add fields to anything but a Record. // because it doesn't make sense to add fields to anything but a Record.
resultPathResult.getSelectedFields().forEach(fieldVal -> { resultPathResult.getSelectedFields().forEach(fieldVal -> {
final Object destinationValue = fieldVal.getValue(); final Object destinationValue = fieldVal.getValue();
if (destinationValue instanceof Record) { if (destinationValue instanceof Record) {
final Record destinationRecord = (Record) destinationValue; final Record destinationRecord = (Record) destinationValue;
for (final String fieldName : lookupRecord.getRawFieldNames()) { for (final String fieldName : lookupRecord.getRawFieldNames()) {
final Object value = lookupRecord.getValue(fieldName); final Object value = lookupRecord.getValue(fieldName);
final Optional<RecordField> recordFieldOption = lookupRecord.getSchema().getField(fieldName); final Optional<RecordField> recordFieldOption = lookupRecord.getSchema().getField(fieldName);
if (recordFieldOption.isPresent()) { if (recordFieldOption.isPresent()) {
// Even if the looked up field is not nullable, if the lookup key didn't match with any record, // Even if the looked up field is not nullable, if the lookup key didn't match with any record,
// and matched/unmatched records are written to the same FlowFile routed to 'success' relationship, // and matched/unmatched records are written to the same FlowFile routed to 'success' relationship,
// then enriched fields should be nullable to support unmatched records whose enriched fields will be null. // then enriched fields should be nullable to support unmatched records whose enriched fields will be null.
RecordField field = recordFieldOption.get(); RecordField field = recordFieldOption.get();
if (!routeToMatchedUnmatched && !field.isNullable()) { if (!routeToMatchedUnmatched && !field.isNullable()) {
field = new RecordField(field.getFieldName(), field.getDataType(), field.getDefaultValue(), field.getAliases(), true); field = new RecordField(field.getFieldName(), field.getDataType(), field.getDefaultValue(), field.getAliases(), true);
}
destinationRecord.setValue(field, value);
} else {
destinationRecord.setValue(fieldName, value);
} }
destinationRecord.setValue(field, value);
} else {
destinationRecord.setValue(fieldName, value);
} }
} else {
final Optional<Record> parentOption = fieldVal.getParentRecord();
parentOption.ifPresent(parent -> parent.setValue(fieldVal.getField(), lookupRecord));
} }
} else { });
final Optional<Record> parentOption = fieldVal.getParentRecord(); } else {
parentOption.ifPresent(parent -> parent.setValue(fieldVal.getField(), lookupRecord)); final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
} resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue, inferredDataType));
}); }
} else {
final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue, inferredDataType));
}
record.incorporateInactiveFields(); record.incorporateInactiveFields();
}
} }
final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION; @Override
return rels; public RecordSchema determineResultSchema(final RecordReaderFactory readerFactory, final RecordSetWriterFactory writerFactory, final ProcessContext context, final ProcessSession session,
final FlowFile flowFile, final LookupContext lookupContext)
throws IOException, SchemaNotFoundException, MalformedRecordException, LookupFailureException {
final Map<String, String> flowFileAttributes = flowFile.getAttributes();
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
Record record;
while ((record = reader.nextRecord()) != null) {
final Map<String, Object> lookupCoordinates = createLookupCoordinates(record, lookupContext, false);
if (lookupCoordinates.isEmpty()) {
continue;
}
final Optional<?> lookupResult = lookupService.lookup(lookupCoordinates, flowFileAttributes);
if (!lookupResult.isPresent()) {
continue;
}
applyLookupResult(record, context, lookupContext, lookupResult.get());
getLogger().debug("Found a Record for {} that returned a result from the LookupService. Will provide the following schema to the Writer: {}", flowFile, record.getSchema());
return record.getSchema();
}
getLogger().debug("Found no Record for {} that returned a result from the LookupService. Will provider Reader's schema to the Writer.", flowFile);
return reader.getSchema();
}
}
private Map<String, Object> createLookupCoordinates(final Record record, final LookupContext lookupContext, final boolean logIfNotMatched) {
final Map<String, RecordPath> recordPaths = lookupContext.getRecordPathsByCoordinateKey();
final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
final FlowFile flowFile = lookupContext.getOriginalFlowFile();
for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
final String coordinateKey = entry.getKey();
final RecordPath recordPath = entry.getValue();
final RecordPathResult pathResult = recordPath.evaluate(record);
final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
.filter(fieldVal -> fieldVal.getValue() != null)
.collect(Collectors.toList());
if (lookupFieldValues.isEmpty()) {
if (logIfNotMatched) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", coordinateKey, flowFile, rels);
}
return Collections.emptyMap();
}
if (lookupFieldValues.size() > 1) {
if (logIfNotMatched) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
getLogger().debug("RecordPath for property '{}' matched {} fields in a record for {}; routing record to {}",
coordinateKey, lookupFieldValues.size(), flowFile, rels);
}
return Collections.emptyMap();
}
final FieldValue fieldValue = lookupFieldValues.get(0);
final Object coordinateValue = DataTypeUtils.convertType(fieldValue.getValue(), fieldValue.getField().getDataType(), null, null, null, fieldValue.getField().getFieldName());
lookupCoordinates.put(coordinateKey, coordinateValue);
}
return lookupCoordinates;
}
@Override
public int getLookupCount() {
return lookupCount;
}
} }
@Override
protected boolean isRouteOriginal() {
return false;
}
@Override protected LookupContext createLookupContext(final FlowFile flowFile, final ProcessContext context, final ProcessSession session, final RecordSetWriterFactory writerFactory) {
protected Tuple<Map<String, RecordPath>, RecordPath> getFlowFileContext(final FlowFile flowFile, final ProcessContext context) {
final Map<String, RecordPath> recordPaths = new HashMap<>(); final Map<String, RecordPath> recordPaths = new HashMap<>();
for (final PropertyDescriptor prop : context.getProperties().keySet()) { for (final PropertyDescriptor prop : context.getProperties().keySet()) {
if (!prop.isDynamic()) { if (!prop.isDynamic()) {
@ -481,7 +716,90 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
resultRecordPath = null; resultRecordPath = null;
} }
return new Tuple<>(recordPaths, resultRecordPath); return new LookupContext(recordPaths, resultRecordPath, session, flowFile, writerFactory, getLogger());
} }
private interface ReplacementStrategy {
Set<Relationship> lookup(Record record, ProcessContext context, LookupContext lookupContext);
RecordSchema determineResultSchema(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, ProcessContext context, ProcessSession session, FlowFile flowFile,
LookupContext lookupContext) throws IOException, SchemaNotFoundException, MalformedRecordException, LookupFailureException;
int getLookupCount();
}
private static class LookupContext {
private final Map<String, RecordPath> recordPathsByCoordinateKey;
private final RecordPath resultRecordPath;
private final ProcessSession session;
private final FlowFile flowFile;
private final RecordSetWriterFactory writerFactory;
private final ComponentLog logger;
private final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writersByRelationship = new HashMap<>();
public LookupContext(final Map<String, RecordPath> recordPathsByCoordinateKey, final RecordPath resultRecordPath, final ProcessSession session, final FlowFile flowFile,
final RecordSetWriterFactory writerFactory, final ComponentLog logger) {
this.recordPathsByCoordinateKey = recordPathsByCoordinateKey;
this.resultRecordPath = resultRecordPath;
this.session = session;
this.flowFile = flowFile;
this.writerFactory = writerFactory;
this.logger = logger;
}
public Map<String, RecordPath> getRecordPathsByCoordinateKey() {
return recordPathsByCoordinateKey;
}
public RecordPath getResultRecordPath() {
return resultRecordPath;
}
public FlowFile getOriginalFlowFile() {
return flowFile;
}
private Set<Relationship> getRelationshipsUsed() {
return writersByRelationship.keySet();
}
public FlowFile getFlowFileForRelationship(final Relationship relationship) {
final Tuple<FlowFile, RecordSetWriter> tuple = writersByRelationship.get(relationship);
return tuple.getKey();
}
public RecordSetWriter getExistingRecordWriterForRelationship(final Relationship relationship) {
final Tuple<FlowFile, RecordSetWriter> tuple = writersByRelationship.get(relationship);
return tuple.getValue();
}
public RecordSetWriter getRecordWriterForRelationship(final Relationship relationship, final RecordSchema schema) throws IOException, SchemaNotFoundException {
final Tuple<FlowFile, RecordSetWriter> tuple = writersByRelationship.get(relationship);
if (tuple != null) {
return tuple.getValue();
}
final FlowFile outFlowFile = session.create(flowFile);
final OutputStream out = session.write(outFlowFile);
try {
final RecordSchema recordWriteSchema = writerFactory.getSchema(flowFile.getAttributes(), schema);
final RecordSetWriter recordSetWriter = writerFactory.createWriter(logger, recordWriteSchema, out, outFlowFile);
recordSetWriter.beginRecordSet();
writersByRelationship.put(relationship, new Tuple<>(outFlowFile, recordSetWriter));
return recordSetWriter;
} catch (final Exception e) {
try {
out.close();
} catch (final Exception e1) {
e.addSuppressed(e1);
}
throw e;
}
}
}
} }

View File

@ -194,6 +194,24 @@ public class TestLookupRecord {
unmatched.assertContentEquals("Jane Doe,47,\n"); unmatched.assertContentEquals("Jane Doe,47,\n");
} }
@Test
public void testAllMatchButFirstRouteToSuccess() {
lookupService.addValue("Jane Doe", "Soccer");
lookupService.addValue("Jimmy Doe", "Football");
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
runner.enqueue("");
runner.run();
runner.assertTransferCount(LookupRecord.REL_FAILURE, 0);
runner.assertTransferCount(LookupRecord.REL_SUCCESS, 1);
final MockFlowFile matched = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
matched.assertAttributeEquals("record.count", "3");
matched.assertAttributeEquals("mime.type", "text/plain");
matched.assertContentEquals("John Doe,48,\nJane Doe,47,Soccer\nJimmy Doe,14,Football\n");
}
@Test @Test
public void testResultPathNotFound() { public void testResultPathNotFound() {