mirror of https://github.com/apache/nifi.git
NIFI-3857: This closes #1825. Added PartitionRecord processor
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
9bd0246a96
commit
ae9953db64
|
@ -52,4 +52,26 @@ public class ArrayIndexFieldValue extends StandardFieldValue {
|
|||
public void updateValue(final Object newValue) {
|
||||
getParentRecord().get().setArrayValue(getField().getFieldName(), getArrayIndex(), newValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(getValue(), getField(), getParent(), index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (obj == this) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof ArrayIndexFieldValue)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final ArrayIndexFieldValue other = (ArrayIndexFieldValue) obj;
|
||||
return Objects.equals(getValue(), other.getValue()) && Objects.equals(getField(), other.getField())
|
||||
&& Objects.equals(getParent(), other.getParent()) && getArrayIndex() == other.getArrayIndex();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.nifi.record.path;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
|
||||
public class MapEntryFieldValue extends StandardFieldValue {
|
||||
|
@ -36,4 +38,25 @@ public class MapEntryFieldValue extends StandardFieldValue {
|
|||
getParentRecord().get().setMapValue(getField().getFieldName(), getMapKey(), newValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(getValue(), getField(), getParent(), mapKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (obj == this) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof MapEntryFieldValue)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final MapEntryFieldValue other = (MapEntryFieldValue) obj;
|
||||
return Objects.equals(getValue(), other.getValue()) && Objects.equals(getField(), other.getField())
|
||||
&& Objects.equals(getParent(), other.getParent()) && Objects.equals(getMapKey(), other.getMapKey());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,6 +56,22 @@ public class StandardFieldValue implements FieldValue {
|
|||
return Objects.hash(value, field, parent);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (obj == this) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof StandardFieldValue)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final StandardFieldValue other = (StandardFieldValue) obj;
|
||||
return Objects.equals(getValue(), other.getValue()) && Objects.equals(getField(), other.getField()) && Objects.equals(getParent(), other.getParent());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (value instanceof Object[]) {
|
||||
|
|
|
@ -45,11 +45,16 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter {
|
|||
Record record;
|
||||
while ((record = recordSet.next()) != null) {
|
||||
write(record);
|
||||
recordCount++;
|
||||
}
|
||||
return finishRecordSet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final WriteResult write(final Record record) throws IOException {
|
||||
final Map<String, String> attributes = writeRecord(record);
|
||||
return WriteResult.of(++recordCount, attributes);
|
||||
}
|
||||
|
||||
protected OutputStream getOutputStream() {
|
||||
return out;
|
||||
}
|
||||
|
@ -102,4 +107,6 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter {
|
|||
protected Map<String, String> onFinishRecordSet() throws IOException {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
protected abstract Map<String, String> writeRecord(Record record) throws IOException;
|
||||
}
|
||||
|
|
|
@ -174,6 +174,10 @@ public class DataTypeUtils {
|
|||
public static DataType chooseDataType(final Object value, final ChoiceDataType choiceType) {
|
||||
for (final DataType subType : choiceType.getPossibleSubTypes()) {
|
||||
if (isCompatibleDataType(value, subType)) {
|
||||
if (subType.getFieldType() == RecordFieldType.CHOICE) {
|
||||
return chooseDataType(value, (ChoiceDataType) subType);
|
||||
}
|
||||
|
||||
return subType;
|
||||
}
|
||||
}
|
||||
|
@ -893,4 +897,30 @@ public class DataTypeUtils {
|
|||
|
||||
return new RecordField(fieldName, dataType, defaultValue, aliases);
|
||||
}
|
||||
|
||||
public static boolean isScalarValue(final DataType dataType, final Object value) {
|
||||
final RecordFieldType fieldType = dataType.getFieldType();
|
||||
|
||||
final RecordFieldType chosenType;
|
||||
if (fieldType == RecordFieldType.CHOICE) {
|
||||
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
|
||||
final DataType chosenDataType = chooseDataType(value, choiceDataType);
|
||||
if (chosenDataType == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
chosenType = chosenDataType.getFieldType();
|
||||
} else {
|
||||
chosenType = fieldType;
|
||||
}
|
||||
|
||||
switch (chosenType) {
|
||||
case ARRAY:
|
||||
case MAP:
|
||||
case RECORD:
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,8 +24,10 @@ import org.apache.avro.Schema;
|
|||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.avro.Schema.Type;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericData.Array;
|
||||
import org.apache.avro.generic.GenericFixed;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.specific.SpecificRecord;
|
||||
import org.apache.avro.util.Utf8;
|
||||
import org.apache.avro.JsonProperties;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
|
@ -278,6 +280,7 @@ public class AvroTypeUtil {
|
|||
return convertToAvroObject(rawValue, fieldSchema, fieldSchema.getName());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) {
|
||||
if (rawValue == null) {
|
||||
return null;
|
||||
|
@ -465,9 +468,13 @@ public class AvroTypeUtil {
|
|||
final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema);
|
||||
try {
|
||||
final Object convertedValue = conversion.apply(nonNullFieldSchema);
|
||||
if (DataTypeUtils.isCompatibleDataType(convertedValue, desiredDataType)
|
||||
// For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
|
||||
|| (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) {
|
||||
|
||||
if (isCompatibleDataType(convertedValue, desiredDataType)) {
|
||||
return convertedValue;
|
||||
}
|
||||
|
||||
// For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
|
||||
if (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) {
|
||||
return convertedValue;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -484,6 +491,33 @@ public class AvroTypeUtil {
|
|||
return null;
|
||||
}
|
||||
|
||||
private static boolean isCompatibleDataType(final Object value, final DataType dataType) {
|
||||
if (value == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (dataType.getFieldType()) {
|
||||
case RECORD:
|
||||
if (value instanceof GenericRecord || value instanceof SpecificRecord) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
case STRING:
|
||||
if (value instanceof Utf8) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
case ARRAY:
|
||||
if (value instanceof Array) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
return DataTypeUtils.isCompatibleDataType(value, dataType);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Convert an Avro object to a normal Java objects for further processing.
|
||||
* The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String)}
|
||||
|
|
|
@ -63,12 +63,14 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
|
|||
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
|
||||
return new RecordSetWriter() {
|
||||
private int recordCount = 0;
|
||||
private boolean headerWritten = false;
|
||||
|
||||
@Override
|
||||
public WriteResult write(final RecordSet rs) throws IOException {
|
||||
if (header != null) {
|
||||
if (header != null && !headerWritten) {
|
||||
out.write(header.getBytes());
|
||||
out.write("\n".getBytes());
|
||||
headerWritten = true;
|
||||
}
|
||||
|
||||
int recordCount = 0;
|
||||
|
@ -110,9 +112,14 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
|
|||
|
||||
@Override
|
||||
public WriteResult write(Record record) throws IOException {
|
||||
if (header != null) {
|
||||
if (++recordCount > failAfterN && failAfterN > -1) {
|
||||
throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written");
|
||||
}
|
||||
|
||||
if (header != null && !headerWritten) {
|
||||
out.write(header.getBytes());
|
||||
out.write("\n".getBytes());
|
||||
headerWritten = true;
|
||||
}
|
||||
|
||||
final int numCols = record.getSchema().getFieldCount();
|
||||
|
@ -135,8 +142,6 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
|
|||
}
|
||||
out.write("\n".getBytes());
|
||||
|
||||
recordCount++;
|
||||
|
||||
return WriteResult.of(1, Collections.emptyMap());
|
||||
}
|
||||
|
||||
|
|
|
@ -129,7 +129,6 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
|
|||
}
|
||||
|
||||
} catch (final Exception ex) {
|
||||
ex.printStackTrace();
|
||||
final ComponentLog logger = getLogger();
|
||||
final String message = "Unable to load script: " + ex.getLocalizedMessage();
|
||||
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
|
|||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
|
||||
public abstract class AbstractRecordProcessor extends AbstractProcessor {
|
||||
|
||||
|
@ -120,39 +119,18 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
|
|||
@Override
|
||||
public void process(final InputStream in, final OutputStream out) throws IOException {
|
||||
|
||||
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
|
||||
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out);
|
||||
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger());
|
||||
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
|
||||
|
||||
final RecordSet recordSet = new RecordSet() {
|
||||
@Override
|
||||
public RecordSchema getSchema() throws IOException {
|
||||
try {
|
||||
return reader.getSchema();
|
||||
} catch (final MalformedRecordException e) {
|
||||
throw new IOException(e);
|
||||
} catch (final Exception e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
writer.beginRecordSet();
|
||||
|
||||
@Override
|
||||
public Record next() throws IOException {
|
||||
try {
|
||||
final Record record = reader.nextRecord();
|
||||
if (record == null) {
|
||||
return null;
|
||||
}
|
||||
Record record;
|
||||
while ((record = reader.nextRecord()) != null) {
|
||||
final Record processed = AbstractRecordProcessor.this.process(record, writeSchema, original, context);
|
||||
writer.write(processed);
|
||||
}
|
||||
|
||||
return AbstractRecordProcessor.this.process(record, writeSchema, original, context);
|
||||
} catch (final MalformedRecordException e) {
|
||||
throw new IOException(e);
|
||||
} catch (final Exception e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
final WriteResult writeResult = writer.write(recordSet);
|
||||
final WriteResult writeResult = writer.finishRecordSet();
|
||||
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
|
||||
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
|
||||
attributes.putAll(writeResult.getAttributes());
|
||||
|
|
|
@ -101,7 +101,14 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
|
|||
return;
|
||||
}
|
||||
|
||||
final T flowFileContext = getFlowFileContext(flowFile, context);
|
||||
final T flowFileContext;
|
||||
try {
|
||||
flowFileContext = getFlowFileContext(flowFile, context);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to process {}; routing to failure", new Object[] {flowFile, e});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -36,11 +37,13 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.lookup.LookupService;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.record.path.FieldValue;
|
||||
import org.apache.nifi.record.path.RecordPath;
|
||||
import org.apache.nifi.record.path.RecordPathResult;
|
||||
|
@ -60,19 +63,29 @@ import org.apache.nifi.util.Tuple;
|
|||
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
|
||||
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile")
|
||||
})
|
||||
@Tags({"lookup", "enrich", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"})
|
||||
@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"})
|
||||
@CapabilityDescription("Extracts a field from a Record and looks up its value in a LookupService. If a result is returned by the LookupService, "
|
||||
+ "that result is optionally added to the Record. In this case, the processor functions as an Enrichment processor. Regardless, the Record is then "
|
||||
+ "routed to either the 'matched' relationship or 'unmatched' relationship, indicating whether or not a result was returned by the LookupService, "
|
||||
+ "routed to either the 'matched' relationship or 'unmatched' relationship (if the 'Routing Strategy' property is configured to do so), "
|
||||
+ "indicating whether or not a result was returned by the LookupService, "
|
||||
+ "allowing the processor to also function as a Routing processor. If any record in the incoming FlowFile has multiple fields match the configured "
|
||||
+ "Lookup RecordPath or if no fields match, then that record will be routed to failure. If one or more fields match the Result RecordPath, all fields "
|
||||
+ "that match will be updated.")
|
||||
@SeeAlso({ConvertRecord.class, SplitRecord.class})
|
||||
+ "Lookup RecordPath or if no fields match, then that record will be routed to 'unmatched' (or 'success', depending on the configuration of the 'Routing Strategy' property). "
|
||||
+ "If one or more fields match the Result RecordPath, all fields "
|
||||
+ "that match will be updated. If there is no match in the configured LookupService, then no fields will be updated. I.e., it will not overwrite an existing value in the Record "
|
||||
+ "with a null value. Please note, however, that if the results returned by the LookupService are not accounted for in your schema (specifically, "
|
||||
+ "the schema that is configured for your Record Writer) then the fields will not be written out to the FlowFile.")
|
||||
@SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService"})
|
||||
public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPath>> {
|
||||
|
||||
private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
|
||||
private volatile LookupService<?> lookupService;
|
||||
|
||||
static final AllowableValue ROUTE_TO_SUCCESS = new AllowableValue("route-to-success", "Route to 'success'",
|
||||
"Records will be routed to a 'success' Relationship regardless of whether or not there is a match in the configured Lookup Service");
|
||||
static final AllowableValue ROUTE_TO_MATCHED_UNMATCHED = new AllowableValue("route-to-matched-unmatched", "Route to 'matched' or 'unmatched'",
|
||||
"Records will be routed to either a 'matched' or an 'unmatched' Relationship depending on whether or not there was a match in the configured Lookup Service. "
|
||||
+ "A single input FlowFile may result in two different output FlowFiles.");
|
||||
|
||||
static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("lookup-service")
|
||||
.displayName("Lookup Service")
|
||||
|
@ -101,6 +114,16 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor ROUTING_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("routing-strategy")
|
||||
.displayName("Routing Strategy")
|
||||
.description("Specifies how to route records after a Lookup has completed")
|
||||
.expressionLanguageSupported(false)
|
||||
.allowableValues(ROUTE_TO_SUCCESS, ROUTE_TO_MATCHED_UNMATCHED)
|
||||
.defaultValue(ROUTE_TO_SUCCESS.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
static final Relationship REL_MATCHED = new Relationship.Builder()
|
||||
.name("matched")
|
||||
.description("All records for which the lookup returns a value will be routed to this relationship")
|
||||
|
@ -109,11 +132,17 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
|
|||
.name("unmatched")
|
||||
.description("All records for which the lookup does not have a matching value will be routed to this relationship")
|
||||
.build();
|
||||
static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("All records will be sent to this Relationship if configured to do so, unless a failure occurs")
|
||||
.build();
|
||||
|
||||
private static final Set<Relationship> MATCHED_COLLECTION = Collections.singleton(REL_MATCHED);
|
||||
private static final Set<Relationship> UNMATCHED_COLLECTION = Collections.singleton(REL_UNMATCHED);
|
||||
private static final Set<Relationship> FAILURE_COLLECTION = Collections.singleton(REL_FAILURE);
|
||||
private static final Set<Relationship> SUCCESS_COLLECTION = Collections.singleton(REL_SUCCESS);
|
||||
|
||||
private volatile Set<Relationship> relationships = new HashSet<>(Arrays.asList(new Relationship[] {REL_SUCCESS, REL_FAILURE}));
|
||||
private volatile boolean routeToMatchedUnmatched = false;
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
|
@ -122,10 +151,6 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
|
|||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_MATCHED);
|
||||
relationships.add(REL_UNMATCHED);
|
||||
relationships.add(REL_FAILURE);
|
||||
return relationships;
|
||||
}
|
||||
|
||||
|
@ -136,9 +161,32 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
|
|||
properties.add(LOOKUP_SERVICE);
|
||||
properties.add(LOOKUP_RECORD_PATH);
|
||||
properties.add(RESULT_RECORD_PATH);
|
||||
properties.add(ROUTING_STRATEGY);
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
if (ROUTING_STRATEGY.equals(descriptor)) {
|
||||
if (ROUTE_TO_MATCHED_UNMATCHED.getValue().equalsIgnoreCase(newValue)) {
|
||||
final Set<Relationship> matchedUnmatchedRels = new HashSet<>();
|
||||
matchedUnmatchedRels.add(REL_MATCHED);
|
||||
matchedUnmatchedRels.add(REL_UNMATCHED);
|
||||
matchedUnmatchedRels.add(REL_FAILURE);
|
||||
this.relationships = matchedUnmatchedRels;
|
||||
|
||||
this.routeToMatchedUnmatched = true;
|
||||
} else {
|
||||
final Set<Relationship> successRels = new HashSet<>();
|
||||
successRels.add(REL_SUCCESS);
|
||||
successRels.add(REL_FAILURE);
|
||||
this.relationships = successRels;
|
||||
|
||||
this.routeToMatchedUnmatched = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Set<Relationship> route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context,
|
||||
final Tuple<RecordPath, RecordPath> flowFileContext) {
|
||||
|
@ -147,14 +195,17 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
|
|||
final List<FieldValue> lookupFieldValues = lookupPathResult.getSelectedFields()
|
||||
.filter(fieldVal -> fieldVal.getValue() != null)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (lookupFieldValues.isEmpty()) {
|
||||
getLogger().error("Lookup RecordPath did not match any fields in a record for {}; routing record to failure", new Object[] {flowFile});
|
||||
return FAILURE_COLLECTION;
|
||||
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
|
||||
getLogger().debug("Lookup RecordPath did not match any fields in a record for {}; routing record to " + rels, new Object[] {flowFile});
|
||||
return rels;
|
||||
}
|
||||
|
||||
if (lookupFieldValues.size() > 1) {
|
||||
getLogger().error("Lookup RecordPath matched {} fields in a record for {}; routing record to failure", new Object[] {lookupFieldValues.size(), flowFile});
|
||||
return FAILURE_COLLECTION;
|
||||
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
|
||||
getLogger().debug("Lookup RecordPath matched {} fields in a record for {}; routing record to " + rels, new Object[] {lookupFieldValues.size(), flowFile});
|
||||
return rels;
|
||||
}
|
||||
|
||||
final FieldValue fieldValue = lookupFieldValues.get(0);
|
||||
|
@ -164,12 +215,12 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
|
|||
try {
|
||||
lookupValue = lookupService.lookup(lookupKey);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to lookup value '{}' in Lookup Service for a record in {}; routing record to failure", new Object[] {lookupKey, flowFile, e});
|
||||
return Collections.singleton(REL_FAILURE);
|
||||
throw new ProcessException("Failed to lookup value '" + lookupKey + "' in Lookup Service", e);
|
||||
}
|
||||
|
||||
if (!lookupValue.isPresent()) {
|
||||
return UNMATCHED_COLLECTION;
|
||||
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
|
||||
return rels;
|
||||
}
|
||||
|
||||
// Ensure that the Record has the appropriate schema to account for the newly added values
|
||||
|
@ -182,7 +233,8 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
|
|||
resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue));
|
||||
}
|
||||
|
||||
return MATCHED_COLLECTION;
|
||||
final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
|
||||
return rels;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,419 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.record.path.FieldValue;
|
||||
import org.apache.nifi.record.path.RecordPath;
|
||||
import org.apache.nifi.record.path.util.RecordPathCache;
|
||||
import org.apache.nifi.record.path.validation.RecordPathValidator;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
|
||||
@EventDriven
|
||||
@SupportsBatching
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@CapabilityDescription("Receives Record-oriented data (i.e., data that can be read by the configured Record Reader) and evaluates one or more RecordPaths against the "
|
||||
+ "each record in the incoming FlowFile. Each record is then grouped with other \"like records\" and a FlowFile is created for each group of \"like records.\" What it means for "
|
||||
+ "two records to be \"like records\" is determined by user-defined properties. The user is required to enter at least one user-defined property whose value is a RecordPath. Two "
|
||||
+ "records are considered alike if they have the same value for all configured RecordPaths. Because we know that all records in a given output FlowFile have the same value for the "
|
||||
+ "fields that are specified by the RecordPath, an attribute is added for each field. See Additional Details on the Usage page for more information and examples.")
|
||||
@DynamicProperty(name="The name given to the dynamic property is the name of the attribute that will be used to denote the value of the associted RecordPath.",
|
||||
value="A RecordPath that points to a field in the Record.",
|
||||
description="Each dynamic property represents a RecordPath that will be evaluated against each record in an incoming FlowFile. When the value of the RecordPath is determined "
|
||||
+ "for a Record, an attribute is added to the outgoing FlowFile. The name of the attribute is the same as the name of this property. The value of the attribute is the same as "
|
||||
+ "the value of the field in the Record that the RecordPath points to. Note that no attribute will be added if the value returned for the RecordPath is null or is not a scalar "
|
||||
+ "value (i.e., the value is an Array, Map, or Record).",
|
||||
supportsExpressionLanguage=true)
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute="record.count", description="The number of records in an outgoing FlowFile"),
|
||||
@WritesAttribute(attribute="mime.type", description="The MIME Type that the configured Record Writer indicates is appropriate"),
|
||||
@WritesAttribute(attribute="<dynamic property name>",
|
||||
description = "For each dynamic property that is added, an attribute may be added to the FlowFile. See the description for Dynamic Properties for more information.")
|
||||
})
|
||||
@Tags({"record", "partition", "recordpath", "rpath", "segment", "split", "group", "bin", "organize"})
|
||||
@SeeAlso({ConvertRecord.class, SplitRecord.class, UpdateRecord.class, QueryRecord.class})
|
||||
|
||||
public class PartitionRecord extends AbstractProcessor {
|
||||
private final RecordPathCache recordPathCache = new RecordPathCache(25);
|
||||
|
||||
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
|
||||
.name("record-reader")
|
||||
.displayName("Record Reader")
|
||||
.description("Specifies the Controller Service to use for reading incoming data")
|
||||
.identifiesControllerService(RecordReaderFactory.class)
|
||||
.required(true)
|
||||
.build();
|
||||
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
|
||||
.name("record-writer")
|
||||
.displayName("Record Writer")
|
||||
.description("Specifies the Controller Service to use for writing out the records")
|
||||
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("FlowFiles that are successfully partitioned will be routed to this relationship")
|
||||
.build();
|
||||
static final Relationship REL_ORIGINAL = new Relationship.Builder()
|
||||
.name("original")
|
||||
.description("Once all records in an incoming FlowFile have been partitioned, the original FlowFile is routed to this relationship.")
|
||||
.build();
|
||||
static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("If a FlowFile cannot be partitioned from the configured input format to the configured output format, "
|
||||
+ "the unchanged FlowFile will be routed to this relationship")
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(RECORD_READER);
|
||||
properties.add(RECORD_WRITER);
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_SUCCESS);
|
||||
relationships.add(REL_FAILURE);
|
||||
relationships.add(REL_ORIGINAL);
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final boolean hasDynamic = validationContext.getProperties().keySet().stream()
|
||||
.anyMatch(prop -> prop.isDynamic());
|
||||
|
||||
if (hasDynamic) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return Collections.singleton(new ValidationResult.Builder()
|
||||
.subject("User-defined Properties")
|
||||
.valid(false)
|
||||
.explanation("At least one RecordPath must be added to this processor by adding a user-defined property")
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.dynamic(true)
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(new RecordPathValidator())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||
final RecordSchema writeSchema;
|
||||
try (final InputStream rawIn = session.read(flowFile);
|
||||
final InputStream in = new BufferedInputStream(rawIn)) {
|
||||
writeSchema = writerFactory.getSchema(flowFile, in);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to partition records for {}; will route to failure", new Object[] {flowFile, e});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
final Map<String, RecordPath> recordPaths;
|
||||
try {
|
||||
recordPaths = context.getProperties().keySet().stream()
|
||||
.filter(prop -> prop.isDynamic())
|
||||
.collect(Collectors.toMap(
|
||||
prop -> prop.getName(),
|
||||
prop -> getRecordPath(context, prop, flowFile)));
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to compile RecordPath for {}; routing to failure", new Object[] {flowFile, e});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
final Map<RecordValueMap, RecordSetWriter> writerMap = new HashMap<>();
|
||||
|
||||
try (final InputStream in = session.read(flowFile)) {
|
||||
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
|
||||
|
||||
Record record;
|
||||
while ((record = reader.nextRecord()) != null) {
|
||||
final Map<String, List<ValueWrapper>> recordMap = new HashMap<>();
|
||||
|
||||
// Evaluate all of the RecordPath's for this Record
|
||||
for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
|
||||
final String propName = entry.getKey();
|
||||
final RecordPath recordPath = entry.getValue();
|
||||
|
||||
final Stream<FieldValue> fieldValueStream = recordPath.evaluate(record).getSelectedFields();
|
||||
final List<ValueWrapper> fieldValues = fieldValueStream
|
||||
.map(fieldVal -> new ValueWrapper(fieldVal.getValue()))
|
||||
.collect(Collectors.toList());
|
||||
recordMap.put(propName, fieldValues);
|
||||
}
|
||||
|
||||
final RecordValueMap recordValueMap = new RecordValueMap(recordMap);
|
||||
|
||||
// Get the RecordSetWriter that contains the same values for all RecordPaths - or create one if none exists.
|
||||
RecordSetWriter writer = writerMap.get(recordValueMap);
|
||||
if (writer == null) {
|
||||
final FlowFile childFlowFile = session.create(flowFile);
|
||||
recordValueMap.setFlowFile(childFlowFile);
|
||||
|
||||
final OutputStream out = session.write(childFlowFile);
|
||||
|
||||
writer = writerFactory.createWriter(getLogger(), writeSchema, childFlowFile, out);
|
||||
writer.beginRecordSet();
|
||||
writerMap.put(recordValueMap, writer);
|
||||
}
|
||||
|
||||
writer.write(record);
|
||||
}
|
||||
|
||||
// For each RecordSetWriter, finish the record set and close the writer.
|
||||
for (final Map.Entry<RecordValueMap, RecordSetWriter> entry : writerMap.entrySet()) {
|
||||
final RecordValueMap valueMap = entry.getKey();
|
||||
final RecordSetWriter writer = entry.getValue();
|
||||
|
||||
final WriteResult writeResult = writer.finishRecordSet();
|
||||
writer.close();
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.putAll(valueMap.getAttributes());
|
||||
attributes.putAll(writeResult.getAttributes());
|
||||
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
|
||||
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
|
||||
|
||||
FlowFile childFlowFile = valueMap.getFlowFile();
|
||||
childFlowFile = session.putAllAttributes(childFlowFile, attributes);
|
||||
|
||||
session.adjustCounter("Record Processed", writeResult.getRecordCount(), false);
|
||||
}
|
||||
|
||||
} catch (final Exception e) {
|
||||
for (final Map.Entry<RecordValueMap, RecordSetWriter> entry : writerMap.entrySet()) {
|
||||
final RecordValueMap valueMap = entry.getKey();
|
||||
final RecordSetWriter writer = entry.getValue();
|
||||
|
||||
try {
|
||||
writer.close();
|
||||
} catch (final IOException e1) {
|
||||
getLogger().warn("Failed to close Record Writer for {}; some resources may not be cleaned up appropriately", new Object[] {flowFile, e1});
|
||||
}
|
||||
|
||||
session.remove(valueMap.getFlowFile());
|
||||
}
|
||||
|
||||
|
||||
getLogger().error("Failed to partition {}", new Object[] {flowFile, e});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
// Transfer the FlowFiles. We wait until the end to do this, in case any IOException is thrown above,
|
||||
// because we want to ensure that we are able to remove the child flowfiles in case of a failure.
|
||||
for (final RecordValueMap valueMap : writerMap.keySet()) {
|
||||
session.transfer(valueMap.getFlowFile(), REL_SUCCESS);
|
||||
}
|
||||
|
||||
session.transfer(flowFile, REL_ORIGINAL);
|
||||
}
|
||||
|
||||
private RecordPath getRecordPath(final ProcessContext context, final PropertyDescriptor prop, final FlowFile flowFile) {
|
||||
final String pathText = context.getProperty(prop).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final RecordPath recordPath = recordPathCache.getCompiled(pathText);
|
||||
return recordPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* We have this ValueWrapper class here because we want to use it as part of the key to a Map and we
|
||||
* want two values that may or may not be arrays. Since calling a.equals(b) returns false when a and b
|
||||
* are arrays, we need to wrap our values in a class that can handle comparisons appropriately.
|
||||
*/
|
||||
static class ValueWrapper {
|
||||
private final Object value;
|
||||
|
||||
public ValueWrapper(final Object value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public Object get() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
if (value == null) {
|
||||
return 31;
|
||||
}
|
||||
|
||||
if (value instanceof Object[]) {
|
||||
return 31 + Arrays.deepHashCode((Object[]) value);
|
||||
}
|
||||
|
||||
return 31 + value.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (obj == this) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof ValueWrapper)) {
|
||||
return false;
|
||||
}
|
||||
final ValueWrapper other = (ValueWrapper) obj;
|
||||
if (value == null && other.value == null) {
|
||||
return true;
|
||||
}
|
||||
if (value == null || other.value == null) {
|
||||
return false;
|
||||
}
|
||||
if (value instanceof Object[] && other.value instanceof Object[]) {
|
||||
return Arrays.equals((Object[]) value, (Object[]) other.value);
|
||||
}
|
||||
return value.equals(other.value);
|
||||
}
|
||||
}
|
||||
|
||||
private static class RecordValueMap {
|
||||
private final Map<String, List<ValueWrapper>> values;
|
||||
private FlowFile flowFile;
|
||||
|
||||
public RecordValueMap(final Map<String, List<ValueWrapper>> values) {
|
||||
this.values = values;
|
||||
}
|
||||
|
||||
public Map<String, String> getAttributes() {
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
for (final Map.Entry<String, List<ValueWrapper>> entry : values.entrySet()) {
|
||||
final List<ValueWrapper> values = entry.getValue();
|
||||
|
||||
// If there are no values or there are multiple values, don't create an attribute.
|
||||
if (values.size() != 1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If value is null, don't create an attribute
|
||||
final Object value = values.get(0).get();
|
||||
if (value == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If value is not scalar, don't create an attribute
|
||||
if (value instanceof Object[] || value instanceof Map || value instanceof Record) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// There exists a single value that is scalar. Create attribute using the property name as the attribute name
|
||||
final String attributeValue = DataTypeUtils.toString(value, (String) null);
|
||||
attributes.put(entry.getKey(), attributeValue);
|
||||
}
|
||||
|
||||
return attributes;
|
||||
}
|
||||
|
||||
public FlowFile getFlowFile() {
|
||||
return flowFile;
|
||||
}
|
||||
|
||||
public void setFlowFile(final FlowFile flowFile) {
|
||||
this.flowFile = flowFile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 41 + 37 * values.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (obj == this) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof RecordValueMap)) {
|
||||
return false;
|
||||
}
|
||||
final RecordValueMap other = (RecordValueMap) obj;
|
||||
return values.equals(other.values);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RecordMapValue[" + values + "]";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,7 +27,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
|
@ -47,7 +46,6 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
|
|
|
@ -66,6 +66,7 @@ org.apache.nifi.processors.standard.MonitorActivity
|
|||
org.apache.nifi.processors.standard.Notify
|
||||
org.apache.nifi.processors.standard.ParseCEF
|
||||
org.apache.nifi.processors.standard.ParseSyslog
|
||||
org.apache.nifi.processors.standard.PartitionRecord
|
||||
org.apache.nifi.processors.standard.PostHTTP
|
||||
org.apache.nifi.processors.standard.PutDatabaseRecord
|
||||
org.apache.nifi.processors.standard.PutDistributedMapCache
|
||||
|
|
|
@ -0,0 +1,190 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<head>
|
||||
<meta charset="utf-8" />
|
||||
<title>PartitionRecord</title>
|
||||
|
||||
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<p>
|
||||
PartitionRecord allows the user to separate out records in a FlowFile such that each outgoing FlowFile
|
||||
consists only of records that are "alike." To define what it means for two records to be alike, the Processor
|
||||
makes use of NiFi's <a href="/nifi-docs/html/record-path-guide.html">RecordPath</a> DSL.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
In order to make the Processor valid, at least one user-defined property must be added to the Processor.
|
||||
The value of the property must be a valid RecordPath. Expression Language is supported and will be evaluated before
|
||||
attempting to compile the RecordPath. However, if Expression Language is used, the Processor is not able to validate
|
||||
the RecordPath before-hand and may result in having FlowFiles fail processing if the RecordPath is not valid when being
|
||||
used.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Once one or more RecordPath's have been added, those RecordPath's are evaluated against each Record in an incoming FlowFile.
|
||||
In order for Record A and Record B to be considered "like records," both of them must have the same value for all RecordPath's
|
||||
that are configured. Only the values that are returned by the RecordPath are held in Java's heap. The records themselves are written
|
||||
immediately to the FlowFile content. This means that for most cases, heap usage is not a concern. However, if the RecordPath points
|
||||
to a large Record field that is different for each record in a FlowFile, then heap usage may be an important consideration. In such
|
||||
cases, SplitRecord may be useful to split a large FlowFile into smaller FlowFiles before partitioning.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Once a FlowFile has been written, we know that all of the Records within that FlowFile have the same value for the fields that are
|
||||
described by the configured RecordPath's. As a result, this means that we can promote those values to FlowFile Attributes. We do so
|
||||
by looking at the name of the property to which each RecordPath belongs. For example, if we have a property named <code>country</code>
|
||||
with a value of <code>/geo/country/name</code>, then each outbound FlowFile will have an attribute named <code>country</code> with the
|
||||
value of the <code>/geo/country/name</code> field. The addition of these attributes makes it very easy to perform tasks such as routing,
|
||||
or referencing the value in another Processor that can be used for configuring where to send the data, etc.
|
||||
However, for any RecordPath whose value is not a scalar value (i.e., the value is of type Array, Map, or Record), no attribute will be added.
|
||||
</p>
|
||||
|
||||
|
||||
|
||||
<h2>Examples</h2>
|
||||
|
||||
<p>
|
||||
To better understand how this Processor works, we will lay out a few examples. For the sake of these examples, let's assume that our input
|
||||
data is JSON formatted and looks like this:
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
[ {
|
||||
"name": "John Doe",
|
||||
"dob": "11/30/1976",
|
||||
"favorites": [ "spaghetti", "basketball", "blue" ],
|
||||
"locations": {
|
||||
"home": {
|
||||
"number": 123,
|
||||
"street": "My Street",
|
||||
"city": "New York",
|
||||
"state": "NY",
|
||||
"country": "US"
|
||||
},
|
||||
"work": {
|
||||
"number": 321,
|
||||
"street": "Your Street",
|
||||
"city": "New York",
|
||||
"state": "NY",
|
||||
"country": "US"
|
||||
}
|
||||
}
|
||||
}, {
|
||||
"name": "Jane Doe",
|
||||
"dob": "10/04/1979",
|
||||
"favorites": [ "spaghetti", "football", "red" ],
|
||||
"locations": {
|
||||
"home": {
|
||||
"number": 123,
|
||||
"street": "My Street",
|
||||
"city": "New York",
|
||||
"state": "NY",
|
||||
"country": "US"
|
||||
},
|
||||
"work": {
|
||||
"number": 456,
|
||||
"street": "Our Street",
|
||||
"city": "New York",
|
||||
"state": "NY",
|
||||
"country": "US"
|
||||
}
|
||||
}
|
||||
}, {
|
||||
"name": "Jacob Doe",
|
||||
"dob": "04/02/2012",
|
||||
"favorites": [ "chocolate", "running", "yellow" ],
|
||||
"locations": {
|
||||
"home": {
|
||||
"number": 123,
|
||||
"street": "My Street",
|
||||
"city": "New York",
|
||||
"state": "NY",
|
||||
"country": "US"
|
||||
},
|
||||
"work": null
|
||||
}
|
||||
}, {
|
||||
"name": "Janet Doe",
|
||||
"dob": "02/14/2007",
|
||||
"favorites": [ "spaghetti", "reading", "white" ],
|
||||
"locations": {
|
||||
"home": {
|
||||
"number": 1111,
|
||||
"street": "Far Away",
|
||||
"city": "San Francisco",
|
||||
"state": "CA",
|
||||
"country": "US"
|
||||
},
|
||||
"work": null
|
||||
}
|
||||
}]
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
|
||||
<h3>Example 1 - Partition By Simple Field</h3>
|
||||
|
||||
<p>
|
||||
For a simple case, let's partition all of the records based on the state that they live in.
|
||||
We can add a property named <code>state</code> with a value of <code>/locations/home/state</code>.
|
||||
The result will be that we will have two outbound FlowFiles. The first will contain an attribute with the name
|
||||
<code>state</code> and a value of <code>NY</code>. This FlowFile will consist of 3 records: John Doe, Jane Doe, and Jacob Doe.
|
||||
The second FlowFile will consist of a single record for Janet Doe and will contain an attribute named <code>state</code> that
|
||||
has a value of <code>CA</code>.
|
||||
</p>
|
||||
|
||||
|
||||
<h3>Example 2 - Partition By Nullable Value</h3>
|
||||
|
||||
<p>
|
||||
In the above example, there are three different values for the work location. If we use a RecordPath of <code>/locations/work/state</code>
|
||||
with a property name of <code>state</code>, then we will end up with two different FlowFiles. The first will contain records for John Doe and Jane Doe
|
||||
because they have the same value for the given RecordPath. This FlowFile will have an attribute named <code>state</code> with a value of <code>NY</code>.
|
||||
</p>
|
||||
<p>
|
||||
The second FlowFile will contain the two records for Jacob Doe and Janet Doe, because the RecordPath will evaluate
|
||||
to <code>null</code> for both of them. This FlowFile will have no <code>state</code> attribute (unless such an attribute existed on the incoming FlowFile,
|
||||
in which case its value will be unaltered).
|
||||
</p>
|
||||
|
||||
|
||||
<h3>Example 3 - Partition By Multiple Values</h3>
|
||||
|
||||
<p>
|
||||
Now let's say that we want to partition records based on multiple different fields. We now add two properties to the PartitionRecord processor.
|
||||
The first property is named <code>home</code> and has a value of <code>/locations/home</code>. The second property is named <code>favorite.food</code>
|
||||
and has a value of <code>/favorites[0]</code> to reference the first element in the "favorites" array.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
This will result in three different FlowFiles being created. The first FlowFile will contain records for John Doe and Jane Doe. If will contain an attribute
|
||||
named "favorite.food" with a value of "spaghetti." However, because the second RecordPath pointed to a Record field, no "home" attribute will be added.
|
||||
In this case, both of these records have the same value for both the first element of the "favorites" array
|
||||
and the same value for the home address. Janet Doe has the same value for the first element in the "favorites" array but has a different home address. Similarly,
|
||||
Jacob Doe has the same home address but a different value for the favorite food.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
The second FlowFile will consist of a single record: Jacob Doe. This FlowFile will have an attribute named "favorite.food" with a value of "chocolate."
|
||||
The third FlowFile will consist of a single record: Janet Doe. This FlowFile will have an attribute named "favorite.food" with a value of "spaghetti."
|
||||
</p>
|
||||
|
||||
</body>
|
||||
</html>
|
|
@ -59,6 +59,7 @@ public class TestLookupRecord {
|
|||
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
|
||||
runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/name");
|
||||
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
|
||||
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
|
||||
|
||||
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||
|
@ -149,8 +150,8 @@ public class TestLookupRecord {
|
|||
runner.enqueue("");
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0);
|
||||
runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
|
||||
|
||||
out.assertAttributeEquals("record.count", "3");
|
||||
out.assertAttributeEquals("mime.type", "text/plain");
|
||||
|
@ -201,8 +202,8 @@ public class TestLookupRecord {
|
|||
runner.enqueue("");
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0);
|
||||
runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
|
||||
|
||||
out.assertAttributeEquals("record.count", "3");
|
||||
out.assertAttributeEquals("mime.type", "text/plain");
|
||||
|
|
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.record.MockRecordParser;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestPartitionRecord {
|
||||
|
||||
private TestRunner runner;
|
||||
private MockRecordParser readerService;
|
||||
private MockRecordWriter writerService;
|
||||
|
||||
@Before
|
||||
public void setup() throws InitializationException {
|
||||
readerService = new MockRecordParser();
|
||||
writerService = new MockRecordWriter(null, false);
|
||||
|
||||
runner = TestRunners.newTestRunner(PartitionRecord.class);
|
||||
runner.addControllerService("reader", readerService);
|
||||
runner.enableControllerService(readerService);
|
||||
runner.addControllerService("writer", writerService);
|
||||
runner.enableControllerService(writerService);
|
||||
|
||||
runner.setProperty(PartitionRecord.RECORD_READER, "reader");
|
||||
runner.setProperty(PartitionRecord.RECORD_WRITER, "writer");
|
||||
|
||||
readerService.addSchemaField("name", RecordFieldType.STRING);
|
||||
readerService.addSchemaField("age", RecordFieldType.INT);
|
||||
readerService.addSchemaField("sports", RecordFieldType.ARRAY);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void groupByStringMixedNumberOfRecords() {
|
||||
runner.setProperty("person-name", "/name");
|
||||
|
||||
readerService.addRecord("John", 28, null);
|
||||
readerService.addRecord("Jake", 49, null);
|
||||
readerService.addRecord("Mark", 19, null);
|
||||
readerService.addRecord("Jane", 20, null);
|
||||
readerService.addRecord("Jake", 14, null);
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
|
||||
runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
|
||||
runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 4);
|
||||
|
||||
final List<MockFlowFile> out = runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS);
|
||||
|
||||
assertEquals(3L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("1")).count());
|
||||
assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("2")).count());
|
||||
|
||||
out.stream().filter(ff -> ff.getAttribute("record.count").equals("2")).forEach(ff -> ff.assertContentEquals("Jake,49,\nJake,14,\n"));
|
||||
|
||||
for (final String name : new String[] {"John", "Jake", "Mark", "Jane"}) {
|
||||
assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("person-name").equals(name)).count());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByIntAllRecordsTogether() {
|
||||
runner.setProperty("age", "/age");
|
||||
|
||||
readerService.addRecord("John", 30, null);
|
||||
readerService.addRecord("Jake", 30, null);
|
||||
readerService.addRecord("Mark", 30, null);
|
||||
readerService.addRecord("Jane", 30, null);
|
||||
readerService.addRecord("Jake", 30, null);
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
|
||||
runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
|
||||
runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 1);
|
||||
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS).get(0);
|
||||
out.assertAttributeEquals("record.count", "5");
|
||||
out.assertContentEquals("John,30,\nJake,30,\nMark,30,\nJane,30,\nJake,30,\n");
|
||||
out.assertAttributeEquals("age", "30");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGroupByMultipleFields() {
|
||||
runner.setProperty("age", "/age");
|
||||
runner.setProperty("name", "/name");
|
||||
|
||||
readerService.addRecord("John", 30, null);
|
||||
readerService.addRecord("Jane", 30, null);
|
||||
readerService.addRecord("John", 30, null);
|
||||
readerService.addRecord("John", 31, null);
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
|
||||
runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
|
||||
runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 3);
|
||||
|
||||
final List<MockFlowFile> out = runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS);
|
||||
assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("John,30,\nJohn,30,\n") && mff.isAttributeEqual("age", "30") && mff.isAttributeEqual("name", "John")).count());
|
||||
assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("Jane,30,\n") && mff.isAttributeEqual("age", "30") && mff.isAttributeEqual("name", "Jane")).count());
|
||||
assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("John,31,\n") && mff.isAttributeEqual("age", "31") && mff.isAttributeEqual("name", "John")).count());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGroupByArrayField() {
|
||||
runner.setProperty("sports", "/sports");
|
||||
|
||||
readerService.addRecord("John", 30, new String[] {"baseball"});
|
||||
readerService.addRecord("Jane", 30, new String[] {"baseball"});
|
||||
readerService.addRecord("John", 30, new String[] {"basketball"});
|
||||
readerService.addRecord("John", 31, new String[] {"football"});
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
|
||||
runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
|
||||
runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 3);
|
||||
|
||||
final List<MockFlowFile> out = runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS);
|
||||
assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("John,30,[baseball]\nJane,30,[baseball]\n")).count());
|
||||
assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("John,30,[basketball]\n")).count());
|
||||
assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("John,31,[football]\n")).count());
|
||||
|
||||
// There should be no sports attribute because it's not a scalar value
|
||||
assertTrue(out.stream().noneMatch(mff -> mff.getAttributes().containsKey("sports")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadFailure() throws IOException {
|
||||
runner.setProperty("sports", "/sports");
|
||||
readerService.failAfter(2);
|
||||
|
||||
readerService.addRecord("John", 30, new String[] {"baseball"});
|
||||
readerService.addRecord("Jane", 30, new String[] {"baseball"});
|
||||
readerService.addRecord("John", 30, new String[] {"basketball"});
|
||||
readerService.addRecord("John", 31, new String[] {"football"});
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PartitionRecord.REL_FAILURE, 1);
|
||||
runner.getFlowFilesForRelationship(PartitionRecord.REL_FAILURE).get(0).assertContentEquals(new byte[0]);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testValueWrapperEqualityWithArrays() {
|
||||
final Object a = new String[] {"baseball"};
|
||||
final Object b = new String[] {"baseball"};
|
||||
|
||||
assertEquals(new PartitionRecord.ValueWrapper(a), new PartitionRecord.ValueWrapper(b));
|
||||
}
|
||||
|
||||
}
|
|
@ -29,7 +29,7 @@ public interface LookupService<T> extends ControllerService {
|
|||
* @param key the key to lookup
|
||||
* @return a value that corresponds to the given key
|
||||
*
|
||||
* @throws if unable to lookup a value for the given key
|
||||
* @throws LookupFailureException if unable to lookup a value for the given key
|
||||
*/
|
||||
Optional<T> lookup(String key) throws LookupFailureException;
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ public interface RecordLookupService extends LookupService<Record> {
|
|||
* @param key the key to lookup
|
||||
* @return an Optional Record that corresponds to the given key
|
||||
*
|
||||
* @throws if unable to lookup a value for the given key
|
||||
* @throws LookupFailureException if unable to lookup a value for the given key
|
||||
*/
|
||||
@Override
|
||||
Optional<Record> lookup(String key) throws LookupFailureException;
|
||||
|
|
|
@ -26,8 +26,6 @@ public interface StringLookupService extends LookupService<String> {
|
|||
*
|
||||
* @param key the key to lookup
|
||||
* @return an Optional String that represents the value for the given key
|
||||
*
|
||||
* @throws if unable to lookup a value for the given key
|
||||
*/
|
||||
@Override
|
||||
Optional<String> lookup(String key);
|
||||
|
|
|
@ -25,35 +25,40 @@
|
|||
<p>
|
||||
The IPLookupService is powered by a MaxMind database and can return several different types of enrichment information
|
||||
about a given IP address. Below is the schema of the Record that is returned by this service (in Avro Schema format).
|
||||
The schema is for a single record that consists of several fields: <code>geo</code>, <code>isp</code>,
|
||||
<code>domainName</code>, <code>connectionType</code>, and <code>anonymousIp</code>. Each of these fields is nullable
|
||||
and will be populated only if the IP address that is searched for has the relevant information in the MaxMind database
|
||||
and if the Controller Service is configured to return such information. Because each of the fields requires a separate
|
||||
lookup in the database, it is advisable to retrieve only those fields that are of value.
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
{
|
||||
"name": "ipEnrichment",
|
||||
"name": "enrichmentRecord",
|
||||
"namespace": "nifi",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "geo",
|
||||
"type": {
|
||||
"type": ["null", {
|
||||
"name": "cityGeo",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "city", "type": "string" },
|
||||
{ "name": "accuracy", "type": "int", "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
|
||||
{ "name": "metroCode", "type": "int" },
|
||||
{ "name": "timeZone", "type": "string" },
|
||||
{ "name": "latitude", "type": "double" },
|
||||
{ "name": "longitude", "type": "double" },
|
||||
{ "name": "country", "type": {
|
||||
{ "name": "city", "type": ["null", "string"] },
|
||||
{ "name": "accuracy", "type": ["null", "int"], "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
|
||||
{ "name": "metroCode", "type": ["null", "int"] },
|
||||
{ "name": "timeZone", "type": ["null", "string"] },
|
||||
{ "name": "latitude", "type": ["null", "double"] },
|
||||
{ "name": "longitude", "type": ["null", "double"] },
|
||||
{ "name": "country", "type": ["null", {
|
||||
"type": "record",
|
||||
"name": "country",
|
||||
"fields": [
|
||||
{ "name": "name", "type": "string" },
|
||||
{ "name": "isoCode", "type": "string" }
|
||||
]
|
||||
} },
|
||||
}] },
|
||||
{ "name": "subdivisions", "type": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
|
@ -66,37 +71,109 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
{ "name": "continent", "type": "string" },
|
||||
{ "name": "postalCode", "type": "string" }
|
||||
{ "name": "continent", "type": ["null", "string"] },
|
||||
{ "name": "postalCode", "type": ["null", "string"] }
|
||||
]
|
||||
}
|
||||
}]
|
||||
},
|
||||
{
|
||||
"name": "isp",
|
||||
"type": {
|
||||
"name": "ispEnrich",
|
||||
"type": ["null", {
|
||||
"name": "ispEnrich",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "name", "type": "string" },
|
||||
{ "name": "organization", "type": "string" },
|
||||
{ "name": "asn", "type": "int" },
|
||||
{ "name": "asnOrganization", "type": "string" }
|
||||
{ "name": "name", "type": ["null", "string"] },
|
||||
{ "name": "organization", "type": ["null", "string"] },
|
||||
{ "name": "asn", "type": ["null", "int"] },
|
||||
{ "name": "asnOrganization", "type": ["null", "string"] }
|
||||
]
|
||||
}
|
||||
}]
|
||||
},
|
||||
{
|
||||
"name": "domainName",
|
||||
"type": "string"
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
{
|
||||
"name": "connectionType",
|
||||
"type": "string",
|
||||
"type": ["null", "string"],
|
||||
"doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'"
|
||||
},
|
||||
{
|
||||
"name": "anonymousIp",
|
||||
"type": ["null", {
|
||||
"name": "anonymousIpType",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "anonymous", "type": "boolean" },
|
||||
{ "name": "anonymousVpn", "type": "boolean" },
|
||||
{ "name": "hostingProvider", "type": "boolean" },
|
||||
{ "name": "publicProxy", "type": "boolean" },
|
||||
{ "name": "torExitNode", "type": "boolean" }
|
||||
]
|
||||
}]
|
||||
}
|
||||
]
|
||||
}
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
</body>
|
||||
|
||||
<p>
|
||||
While this schema is fairly complex, it is a single record with 5 fields. This makes it quite easy to update
|
||||
an existing schema to allow for this record, by adding a new field to an existing schema and pasting in the schema
|
||||
above as the type.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
For example, suppose that we have an existing schema that is as simple as:
|
||||
</p>
|
||||
|
||||
<pre>
|
||||
<code>
|
||||
<span style="color: #808080;">
|
||||
{
|
||||
"name": "ipRecord",
|
||||
"namespace": "nifi",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "ip", "type": "string" }
|
||||
]
|
||||
}
|
||||
</span>
|
||||
</code>
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
Now, let's suppose that we want to add a new field named <code>enrichment</code> to the above schema.
|
||||
Further, let's say that we want the new <code>enrichment</code> field to be nullable.
|
||||
We can do so by copying and pasting our enrichment schema from above thus:
|
||||
</p>
|
||||
|
||||
<pre>
|
||||
<code>
|
||||
<span style="color: #808080;">
|
||||
{
|
||||
"name": "ipRecord",
|
||||
"namespace": "nifi",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "ip", "type": "string" },</span><span style="color: #191970;">
|
||||
{ "name": "enrichment", "type": ["null",
|
||||
</span>
|
||||
|
||||
<span style="color: #000000"><Paste Enrichment Schema Here></span>
|
||||
|
||||
<span style="color: #191970;">
|
||||
]</span><span style="color: #808080;">
|
||||
}
|
||||
]
|
||||
}
|
||||
</span>
|
||||
</code>
|
||||
</pre>
|
||||
|
||||
|
||||
|
||||
|
||||
</body>
|
||||
</html>
|
|
@ -30,7 +30,6 @@ import org.apache.avro.io.DatumWriter;
|
|||
import org.apache.avro.io.EncoderFactory;
|
||||
import org.apache.nifi.schema.access.SchemaAccessWriter;
|
||||
import org.apache.nifi.serialization.AbstractRecordSetWriter;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
|
@ -68,10 +67,10 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public WriteResult write(final Record record) throws IOException {
|
||||
public Map<String, String> writeRecord(final Record record) throws IOException {
|
||||
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, avroSchema);
|
||||
datumWriter.write(rec, encoder);
|
||||
return WriteResult.of(1, schemaAccessWriter.getAttributes(recordSchema));
|
||||
return schemaAccessWriter.getAttributes(recordSchema);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,13 +20,13 @@ package org.apache.nifi.avro;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.file.DataFileWriter;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.nifi.serialization.AbstractRecordSetWriter;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
|
||||
public class WriteAvroResultWithSchema extends AbstractRecordSetWriter {
|
||||
|
@ -49,10 +49,10 @@ public class WriteAvroResultWithSchema extends AbstractRecordSetWriter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public WriteResult write(final Record record) throws IOException {
|
||||
public Map<String, String> writeRecord(final Record record) throws IOException {
|
||||
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
|
||||
dataFileWriter.append(rec);
|
||||
return WriteResult.of(1, Collections.emptyMap());
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.commons.csv.CSVPrinter;
|
|||
import org.apache.nifi.schema.access.SchemaAccessWriter;
|
||||
import org.apache.nifi.serialization.AbstractRecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
|
@ -90,14 +89,14 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
|
|||
}
|
||||
|
||||
@Override
|
||||
public WriteResult write(final Record record) throws IOException {
|
||||
public Map<String, String> writeRecord(final Record record) throws IOException {
|
||||
int i = 0;
|
||||
for (final RecordField recordField : recordSchema.getFields()) {
|
||||
fieldValues[i++] = record.getAsString(recordField, getFormat(recordField));
|
||||
}
|
||||
|
||||
printer.printRecord(fieldValues);
|
||||
return WriteResult.of(1, schemaWriter.getAttributes(recordSchema));
|
||||
return schemaWriter.getAttributes(recordSchema);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.nifi.logging.ComponentLog;
|
|||
import org.apache.nifi.schema.access.SchemaAccessWriter;
|
||||
import org.apache.nifi.serialization.AbstractRecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
|
@ -97,9 +96,9 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
|||
}
|
||||
|
||||
@Override
|
||||
public WriteResult write(final Record record) throws IOException {
|
||||
public Map<String, String> writeRecord(final Record record) throws IOException {
|
||||
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
|
||||
return WriteResult.of(1, schemaAccess.getAttributes(recordSchema));
|
||||
return schemaAccess.getAttributes(recordSchema);
|
||||
}
|
||||
|
||||
private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator, final GeneratorTask startTask, final GeneratorTask endTask)
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Map;
|
|||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.serialization.AbstractRecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
@ -60,9 +59,9 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor
|
|||
}
|
||||
|
||||
@Override
|
||||
public WriteResult write(final Record record) throws IOException {
|
||||
public Map<String, String> writeRecord(final Record record) throws IOException {
|
||||
write(record, out, getColumnNames(record.getSchema()));
|
||||
return WriteResult.of(1, Collections.emptyMap());
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
private void write(final Record record, final OutputStream out, final List<String> columnNames) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue